Hadoop Cascading - create a stream with one source, two sinks

I use Cascading 2 to create Hadoop jobs and try to create a thread that starts from a single source. After several functions are applied to the data, I need to split the stream so that this data is used to create two separate reports (in two separate receivers).

//SOURCE Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, input ); //REPORT1 SINK Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" ); Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE ); //REPORT2 SINK Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" ); Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE ); //INITIAL FUNCTIONS Pipe firstPipe = new Pipe("firstPipe"); firstPipe = new Each(firstPipe, new Fields("line"), functionA); firstPipe = new Each(firstPipe, functionB, Fields.ALL); //REPORT1 FUNCTION report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS); //REPORT2 FUNCTION report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS); //CONNECT FLOW PARTS FlowDef flowDef = new FlowDef() .setName("report-flow") .addSource(firstPipe, source) .addSink(report1Pipe, report1Sink) .addSink(report2Pipe, report2Sink); new HadoopFlowConnector( properties ).connect( flowDef ).complete(); 

This currently gives me the error "java.lang.IllegalArgumentException: cannot add duplicate receiver: firstPipe", but even after he messed around with it for a while, I get a lot of other problems related to stream setting.

Is it possible for someone to explain how to build a stream of this form (one source, two receivers)? Do I need to create a cascade? Or do I need an intermediate receiver for storing data before separation?

Please, help!

+4
source share
2 answers

You can use the separation template as described in the Cascading documentation. Here is an example:

 public static void main(String[] args) { // source and sink Scheme sourceScheme = new TextLine(new Fields("line")); Tap source = new FileTap(sourceScheme, args[0]); Fields sinkFields = new Fields("word", "count"); Scheme sinkScheme = new TextLine(sinkFields, sinkFields); Tap sink_one = new FileTap(sinkScheme, "out-one.txt"); Tap sink_two = new FileTap(sinkScheme, "out-two.txt"); // the pipe assembly Pipe assembly = new Pipe("wordcount"); String regex = "\\w+"; Function function = new RegexGenerator(new Fields("word"), regex); assembly = new Each(assembly, new Fields("line"), function); Aggregator count = new Count(new Fields("count")); // ...split into two pipes Pipe countOne = new Pipe("count-one", assembly); countOne = new GroupBy(countOne, new Fields("word")); countOne = new Every(countOne, count); Pipe countTwo = new Pipe("count-two", assembly); countTwo = new GroupBy(countTwo, new Fields("word")); countTwo = new Every(countTwo, count); // create the flow final List<Pipe> pipes = new ArrayList<Pipe>(2); pipes.add(countOne); pipes.add(countTwo); final Map<String, Tap> sinks = new HashMap<String, Tap>(); sinks.put("count-one", sink_one); sinks.put("count-two", sink_two); FlowConnector flowConnector = new LocalFlowConnector(); Flow flow = flowConnector.connect(source, sinks, pipes); flow.complete(); } 
+5
source

The separation template is located in the cascading user guide at: http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362

Another (simpler) example is included in Cascading for the Intolerant, Parts 5 and 6:

One point about code shown above is that there are apparently no variable definitions for report1Pipe and report2Pipe . To use a split pattern, each branch requires a name, and the names must be different.

Excluded because there are two branches that inherit the same name earlier in the assembly. So, for example, those calls to flowDef.addSink(..) ambiguous for the flow scheduler.

So, in the fifth part of β€œImpatient”, see how the branches β€œD”, β€œDF” and β€œTF” get names in operations.

It may seem a little intuitive for Cascading to require this name, but it becomes very important in large complex processes when you attach traps to failures, debug, etc. for certain branches.

Alternatively, Cascalog DSL in Clojure is much more declarative, so it is handled by the language directly - branches are subqueries, and traps, etc. processed within the closing of the subquery.

+4
source

Source: https://habr.com/ru/post/1432107/


All Articles