Filtering inside a FOREACH block with a condition value computed inside the same block in PIG

I have a log dataset and I need to filter out all the log entries for the equipment after the failure (Action = 2).

In this example:

EquipId, ScvId, Action, TimeStamp
Ag,01,1,14-01-01 0:00:01
Ag,01,1,14-01-02 0:00:01
Ag,01,2,14-01-03 0:00:01
Ag,01,1,14-01-04 0:00:01
Ag,01,1,14-01-05 0:00:01
Ag,01,2,14-01-06 0:00:01
Ag,01,1,14-01-07 0:00:01
Ra,01,1,14-01-01 0:00:01
Ra,01,1,14-01-02 0:00:01
Ra,01,1,14-01-03 0:00:01
Ra,01,2,14-01-04 0:00:01
Fe,01,2,14-01-03 0:00:01
Fe,01,1,14-01-03 0:00:02
Fe,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01
Lu,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01

The expected result will be

Ag,01,1,14-01-01 0:00:01
Ag,01,1,14-01-02 0:00:01
Ag,01,2,14-01-03 0:00:01
Ra,01,1,14-01-01 0:00:01
Ra,01,1,14-01-02 0:00:01
Ra,01,1,14-01-03 0:00:01
Ra,01,2,14-01-04 0:00:01
Fe,01,2,14-01-03 0:00:01
Lu,01,1,14-01-05 0:00:01
Lu,01,1,14-01-04 0:00:01
Lu,01,1,14-01-05 0:00:01

I tried to program it in one FOREACH block as follows:

rawData = LOAD './test.csv'  USING PigStorage(',') AS (equipId:chararray, svcId:chararray, action:chararray, date:chararray);

equipDataGrp = GROUP rawData BY equipId;

minFail = FOREACH equipDataGrp {

    actionFail = FILTER rawData BY action == '2';
    minFailDate = MIN(actionFail.date);
    prevActionsFail = FILTER rawData BY date <= minFailDate;


    GENERATE group as equipId, FLATTEN(prevActionsFail.date);

};

and I get the following error:

2014-03-05 11:08:11,720 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1000: 
<line 36, column 28> Invalid field reference. Referenced field [date] does not exist in schema: .

If I rigidly set the date as:

minFail = FOREACH equipDataGrp {

    actionFail = FILTER rawData BY action == '2';
    minFailDate = MIN(actionFail.date);
    prevActionsFail = FILTER rawData BY date == '14-01-03 0:00:01';


    GENERATE group as equipId, FLATTEN(prevActionsFail.date);

};

I get the answer:

(Ag,14-01-03 0:00:01)
(Fe,14-01-03 0:00:01)
(Ra,14-01-03 0:00:01)

Any suggestion?

Thanks in advance!

+4
source share
1 answer

You need to calculate the time of failure and extend it to all entries for the equipment identifier. Then you can filter records with timestamps later:

rawData = LOAD './test.csv'  USING PigStorage(',') AS (equipId:chararray, svcId:chararray, action:chararray, date:chararray);

equipDataGrp = GROUP rawData BY equipId;

/* Expand out into all records again, appending the earliest failure time */
minFail = FOREACH equipDataGrp {
    actionFail = FILTER rawData BY action == '2';
    GENERATE FLATTEN(rawData), MIN(actionFail.date) AS failTime;
};

notYetFailed = FOREACH (FILTER minFail BY date <= failTime) GENERATE equipId .. date;
+5
source

Source: https://habr.com/ru/post/1530231/


All Articles