Yes, it is possible:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");
You can also use any other grouping.
Update:
To distinguish tuples (i.e. topic_1 or topic_2) in a consumer bolt, there are two possibilities:
1) You can use operator identifiers (as suggested by @ user-4870385):
if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
//do something
} else {
//do something
}
2) You can use stream names (as suggested by @zenbeni). For this case, both trays should declare named streams, and the bolt should be connected to spouts by stream names:
public class MyKafkaSpout extends KafkaSpout {
final String streamName;
public MyKafkaSpout(String stream) {
this.streamName = stream;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
}
}
Create a topology, now you need to use the stream names:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");
MyBolt :
if(input.getSourceStreamId().equals("Topic1")) {
} else {
}
:
( @zenbeni), (IHMO). / (.. , /); , (.. , / ).
, ( ). , . , . , . , . , ( ), , .
, ( final declareOutputFields(...)), . .