There is an alternative solution to this problem,
. 1000, HdfsBolt 1000 , hsync() execute(), , , .
, , .
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(100.0f, Units.KB);
, , - excecute . diff , , .
- , .
. :
public class CustomHdfsBolt1 extends AbstractHdfsBolt {
private static final Logger LOG = LoggerFactory.getLogger(CustomHdfsBolt1.class);
private transient FSDataOutputStream out;
private RecordFormat format;
private long offset = 0L;
private int tickTupleCount = 0;
private String type;
private long normalTupleTime;
private long tickTupleTime;
public CustomHdfsBolt1() {
}
public CustomHdfsBolt1(String type) {
this.type = type;
}
public CustomHdfsBolt1 withFsUrl(String fsUrl) {
this.fsUrl = fsUrl;
return this;
}
public CustomHdfsBolt1 withConfigKey(String configKey) {
this.configKey = configKey;
return this;
}
public CustomHdfsBolt1 withFileNameFormat(FileNameFormat fileNameFormat) {
this.fileNameFormat = fileNameFormat;
return this;
}
public CustomHdfsBolt1 withRecordFormat(RecordFormat format) {
this.format = format;
return this;
}
public CustomHdfsBolt1 withSyncPolicy(SyncPolicy syncPolicy) {
this.syncPolicy = syncPolicy;
return this;
}
public CustomHdfsBolt1 withRotationPolicy(FileRotationPolicy rotationPolicy) {
this.rotationPolicy = rotationPolicy;
return this;
}
public CustomHdfsBolt1 addRotationAction(RotationAction action) {
this.rotationActions.add(action);
return this;
}
protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
public void execute(Tuple tuple) {
try {
if (isTickTuple(tuple)) {
tickTupleTime = Calendar.getInstance().getTimeInMillis();
long timeDiff = normalTupleTime - tickTupleTime;
long diffInSeconds = TimeUnit.MILLISECONDS.toSeconds(timeDiff);
if (diffInSeconds > 5) {
this.rotateWithOutFileSize(tuple);
}
} else {
normalTupleTime = Calendar.getInstance().getTimeInMillis();
this.rotateWithFileSize(tuple);
}
} catch (IOException var6) {
LOG.warn("write/sync failed.", var6);
this.collector.fail(tuple);
}
}
public void rotateWithFileSize(Tuple tuple) throws IOException {
syncHdfs(tuple);
this.collector.ack(tuple);
if (this.rotationPolicy.mark(tuple, this.offset)) {
this.rotateOutputFile();
this.offset = 0L;
this.rotationPolicy.reset();
}
}
public void rotateWithOutFileSize(Tuple tuple) throws IOException {
syncHdfs(tuple);
this.collector.ack(tuple);
this.rotateOutputFile();
this.offset = 0L;
this.rotationPolicy.reset();
}
public void syncHdfs(Tuple tuple) throws IOException {
byte[] e = this.format.format(tuple);
synchronized (this.writeLock) {
this.out.write(e);
this.offset += (long) e.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
}
this.syncPolicy.reset();
}
}
}
public void closeOutputFile() throws IOException {
this.out.close();
}
public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing HDFS Bolt...");
this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
this.tickTupleCount = 0;
this.normalTupleTime = 0;
this.tickTupleTime = 0;
}
public Path createOutputFile() throws IOException {
Path path = new Path(this.fileNameFormat.getPath(),
this.fileNameFormat.getName((long) this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path);
return path;
}
}
.
,