Can Storm HdfsBolt reset data after timeout?

We use Storm to process streaming data and store it in HDFS. We have everything to work, but there is one problem. I understand that we can specify the number of tuples after which the data will be flushed to HDFS using SyncPolicy, something like this:

SyncPolicy syncPolicy = new function CountSyncPolicy (Integer.parseInt (args [3]));

Can the question I have also be reset after a timeout? E.g. we set SyncPolicy above 1000 tuples. If for some reason we get 995 tuples and then the data stops coming in for some time , is there a way that the storm can reset 995 records in HDFS after the specified timeout (5 seconds)?

Thanks in advance for your help!

  • Shay
+4
source share
2 answers

Yes, if you send a tick to the HDFS bolt, it will make the bolt try to sync with the HDFS file system. All this happens in the HDFS bolt run function .

To configure tick tuples for your topology, in your topology configuration. In Java, to establish that every 300 seconds the code would look like this:

Config topologyConfig = new Config();
topologyConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);

StormSubmitter.submitTopology("mytopology", topologyConfig, builder.createTopology());

You will have to customize this last line according to your circumstances.

+1
source

There is an alternative solution to this problem,

. 1000, HdfsBolt 1000 , hsync() execute(), , , .

, , .

 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(100.0f, Units.KB);

, , - excecute . diff , , .

- , .

. :

public class CustomHdfsBolt1 extends AbstractHdfsBolt {

private static final Logger LOG = LoggerFactory.getLogger(CustomHdfsBolt1.class);
private transient FSDataOutputStream out;
private RecordFormat format;
private long offset = 0L;
private int tickTupleCount = 0;
private String type;
private long normalTupleTime;
private long tickTupleTime;

public CustomHdfsBolt1() {

}


public CustomHdfsBolt1(String type) {
    this.type = type;
}

public CustomHdfsBolt1 withFsUrl(String fsUrl) {
    this.fsUrl = fsUrl;
    return this;
}

public CustomHdfsBolt1 withConfigKey(String configKey) {
    this.configKey = configKey;
    return this;
}

public CustomHdfsBolt1 withFileNameFormat(FileNameFormat fileNameFormat) {
    this.fileNameFormat = fileNameFormat;
    return this;
}

public CustomHdfsBolt1 withRecordFormat(RecordFormat format) {
    this.format = format;
    return this;
}

public CustomHdfsBolt1 withSyncPolicy(SyncPolicy syncPolicy) {
    this.syncPolicy = syncPolicy;
    return this;
}

public CustomHdfsBolt1 withRotationPolicy(FileRotationPolicy rotationPolicy) {
    this.rotationPolicy = rotationPolicy;
    return this;
}

public CustomHdfsBolt1 addRotationAction(RotationAction action) {
    this.rotationActions.add(action);
    return this;
}

protected static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}


public void execute(Tuple tuple) {
    try {

        if (isTickTuple(tuple)) {
            tickTupleTime = Calendar.getInstance().getTimeInMillis();

            long timeDiff = normalTupleTime - tickTupleTime;

            long diffInSeconds = TimeUnit.MILLISECONDS.toSeconds(timeDiff);

            if (diffInSeconds > 5) {  // specify the value you want.
                this.rotateWithOutFileSize(tuple);

            }

        } else {

            normalTupleTime = Calendar.getInstance().getTimeInMillis();
            this.rotateWithFileSize(tuple);
        }
    } catch (IOException var6) {
        LOG.warn("write/sync failed.", var6);
        this.collector.fail(tuple);
    }

}


public void rotateWithFileSize(Tuple tuple) throws IOException {

    syncHdfs(tuple);

    this.collector.ack(tuple);

    if (this.rotationPolicy.mark(tuple, this.offset)) {
        this.rotateOutputFile();
        this.offset = 0L;
        this.rotationPolicy.reset();
    }
}


public void rotateWithOutFileSize(Tuple tuple) throws IOException {

    syncHdfs(tuple);

    this.collector.ack(tuple);

    this.rotateOutputFile();
    this.offset = 0L;
    this.rotationPolicy.reset();

}

public void syncHdfs(Tuple tuple) throws IOException {
    byte[] e = this.format.format(tuple);

    synchronized (this.writeLock) {
        this.out.write(e);
        this.offset += (long) e.length;
        if (this.syncPolicy.mark(tuple, this.offset)) {
            if (this.out instanceof HdfsDataOutputStream) {
                ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
            } else {
                this.out.hsync();
            }

            this.syncPolicy.reset();
        }
    }


}

public void closeOutputFile() throws IOException {
    this.out.close();
}

public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
    LOG.info("Preparing HDFS Bolt...");
    this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
    this.tickTupleCount = 0;
    this.normalTupleTime = 0;
    this.tickTupleTime = 0;

}

public Path createOutputFile() throws IOException {
    Path path = new Path(this.fileNameFormat.getPath(),
            this.fileNameFormat.getName((long) this.rotation, System.currentTimeMillis()));
    this.out = this.fs.create(path);
    return path;
}
}

.

,

+1

Source: https://habr.com/ru/post/1610647/


All Articles