How to send the output of two different nozzles to the same bolt?

I have two Kafka Spouts whose values ​​I want to send to the same bolt.

Is it possible?

+4
source share
2 answers

Yes, it is possible:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

You can also use any other grouping.

Update:

To distinguish tuples (i.e. topic_1 or topic_2) in a consumer bolt, there are two possibilities:

1) You can use operator identifiers (as suggested by @ user-4870385):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2) You can use stream names (as suggested by @zenbeni). For this case, both trays should declare named streams, and the bolt should be connected to spouts by stream names:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

Create a topology, now you need to use the stream names:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt :

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

:

( @zenbeni), (IHMO). / (.. , /); , (.. , / ).

, ( ). , . , . , . , . , ( ), , .

, ( final declareOutputFields(...)), . .

+11

Source: https://habr.com/ru/post/1589323/


All Articles