I use Cascading 2 to create Hadoop jobs and try to create a thread that starts from a single source. After several functions are applied to the data, I need to split the stream so that this data is used to create two separate reports (in two separate receivers).
//SOURCE Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, input ); //REPORT1 SINK Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" ); Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE ); //REPORT2 SINK Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" ); Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE ); //INITIAL FUNCTIONS Pipe firstPipe = new Pipe("firstPipe"); firstPipe = new Each(firstPipe, new Fields("line"), functionA); firstPipe = new Each(firstPipe, functionB, Fields.ALL); //REPORT1 FUNCTION report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS); //REPORT2 FUNCTION report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS); //CONNECT FLOW PARTS FlowDef flowDef = new FlowDef() .setName("report-flow") .addSource(firstPipe, source) .addSink(report1Pipe, report1Sink) .addSink(report2Pipe, report2Sink); new HadoopFlowConnector( properties ).connect( flowDef ).complete();
This currently gives me the error "java.lang.IllegalArgumentException: cannot add duplicate receiver: firstPipe", but even after he messed around with it for a while, I get a lot of other problems related to stream setting.
Is it possible for someone to explain how to build a stream of this form (one source, two receivers)? Do I need to create a cascade? Or do I need an intermediate receiver for storing data before separation?
Please, help!
Katie source share