Spout doesn't get

In one of our topologies, containing 1 nozzle and 1 bolt - I had a hunch that the bolts finish the fine (and toss), but the nozzles still don't work.

I tried to confirm this by having a TaskHook as shown below -

public class BaseHook extends BaseTaskHook { private Logger logger; private String topology; private String component; public BaseHook(String component) { this.component = component; } @Override public void prepare(Map conf, TopologyContext context) { logger = LoggerFactory.getLogger(this.getClass()); this.topology = (String) conf.get("topology.name"); } @Override public void emit(EmitInfo info) { log("EMITTED >> Value = " + info.values); } @Override public void spoutAck(SpoutAckInfo info) { log("ACKED >> Tuple = " + info.messageId + ", Latency = " + info.completeLatencyMs); } @Override public void spoutFail(SpoutFailInfo info) { log("FAILED >> Tuple = " + info.messageId + ", Latency = " + info.failLatencyMs); } @Override public void boltExecute(BoltExecuteInfo info) { log("EXECUTED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.executeLatencyMs); } @Override public void boltAck(BoltAckInfo info) { log("ACKED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.processLatencyMs); } @Override public void boltFail(BoltFailInfo info) { log("FAILED >> Tuple = " + info.tuple.getValues() + ", Latency = " + info.failLatencyMs); } private void log(String msg) { logger.info(">>>>> " + topology + " >> " + component + " >> " + msg); } } 

It turned out that my guess is correct. Logs look something like this:

 >>>>> TopologyX >> SpoutX >> EMITTED >> Value = [XXXXXXXXX] >>>>> TopologyX >> BoltX >> ACKED >> Tuple = [XXXXXXXXX], Latency = 1972 >>>>> TopologyX >> BoltX >> EXECUTED >> Tuple = [XXXXXXXXX], Latency = 1973 >>>>> TopologyX >> SpoutX >> FAILED >> Tuple = XXXXXXXXX, Latency = 53913 

t. The bolt takes almost 2 seconds (to execute and Ack), but the Spout Fail is called for about 53 seconds (almost half the size of topology.message.timeout.secs * 2 .

I would expect Spout Ack to be named the same for 2-3 seconds. The plow is not blocked, and the bolts and booms have sufficient capacity for operation.

Anyone have any hints as to what could be causing?


Update

So, this is what the storm cluster looked like -

  • 4 Topologies
    • T1 = S> B> B> B> Ack / Fail
    • T2 = S> B> Ack / Fail
    • T3 = S> B> B> Ack / Fail
    • T4 =
      • S> B> Ack / Fail
      • S> B> Ack / Fail

So, the considered topology T4 i.e. one with 2 different nozzles and 2 bolts. One of these threads usually works fine (they have different message identifiers that uniquely identify tuples)

Could this be a problem?

Anyway,

  • We tried to reduce the number of artists to the extent that we can, but this did not improve anything in T4 .
  • We disabled all other topologies and everything works fine for T4
  • We turned on T1 and it still worked fine
  • We turned on T2 (and T3 for another occasion) and T4 started to crash

Now,

  • In one random case, T4 even worked with T1 and T3 turned on.
  • But otherwise, every time T2 or T3 activated, T4 breaks.

Points for notes -

  • T3 and T4 are a fast topology, i.e. their flow ends at <100ms
  • Both T3 and T4 have only 1 performer per nose and bolts
  • Both T3 and T4 have Max Tuple Pending = 1
  • We want to evaluate the limit of both T3 and T4 (but we tried without speed limit)
    • Try 1: Without any restrictions
    • Try 2: Sleep for 50 ms before emitting.
    • Try 3: Sleep for 50 ms after emission
    • Try 4: do not sleep, but only radiate if it has been 50 since the last emission
    • Nothing works

Additional information based on comment

All outputs are distributed from the BaseSpout class -

 public abstract class BaseSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { context.addTaskHook(new BaseHook(this.getClass().getSimpleName())); try { this.collector = collector; open(); } catch (Exception e) { throw new RuntimeException("Error when preparing spout", e); } } @Override public void nextTuple() { try { getTuple(); } catch (Throwable t) { if (!(t instanceof FailedException)) { t = new FailedException("nextTuple()", t); } collector.reportError(t); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] fields = getFields(); if (fields != null) { declarer.declare(new Fields(fields)); } } protected void emit(Values values, String msgId) { collector.emit(values, msgId); } protected abstract void open() throws Exception; protected abstract void getTuple() throws Exception; protected abstract String[] getFields(); } 

and all bolts extend from the BaseBolt class -

 public abstract class BaseBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { context.addTaskHook(new BaseHook(this.getClass().getSimpleName())); try { this.collector = collector; prepare(); } catch (Exception e) { throw new RuntimeException("Error when preparing bolt", e); } } @Override public void execute(Tuple tuple) { try { process(tuple); collector.ack(tuple); } catch (Throwable t) { if (!(t instanceof FailedException)) { t = new FailedException("execute(" + tuple + ")", t); } collector.reportError(t); collector.fail(tuple); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] fields = getFields(); if (fields != null) { declarer.declare(new Fields(fields)); } } protected void emit(Tuple tuple, Values values) { collector.emit(tuple, values); } protected abstract void prepare() throws Exception; protected abstract void process(Tuple tuple) throws Exception; protected abstract String[] getFields(); } 

So to speak, there is no way to emit a tuple without a messageID (from a nose) or an unanchored tuple (from a bolt)

+6
source share
1 answer

This problem is that calls to Spout.nextTuple() and Spout.ack() or Spout.fail() occur on the same thread. If you add a large amount of tuples to your topology, then ack or fail messages ultimately await processing by the original spout, which leads to an increase in delay to ack / fail.

You also mentioned that β€œsleep” had no effect. If you mean that you called Thread.sleep() in nextTuple() , then this will only exacerbate the situation if you stop the thread that will handle ack / fails.

0
source

Source: https://habr.com/ru/post/971141/


All Articles