Fortunately, Kafka 0.10.1.0brought us this ability. I saw this fascinating feature on board the Confluence Jira, but I could not find any documentation related to this topic, irony, right?
So, I went to the source code and found a way to create themes on the fly. Hope this will be helpful for some of you. Of course, if you have a better solution, please feel free to share it with us.
, .
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
.collect(Collectors.toMap(k -> k, v -> topicDetails));
CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout);
try {
CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG);
return response.errors().entrySet().stream()
.filter(error -> error.getValue() == Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
} catch (IOException e) {
log.error(e);
}
return null;
}
1 TopicDetails, . , mTopics - , .
2 Kafka, , - CreateTopicsRequest timeout
3 CreateTopicsResponse
private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
private static final short version = 0;
private static final short correlationId = -1;
private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
String[] comp = client.split(":");
if (comp.length != 2) {
throw new IllegalArgumentException("Wrong client directive");
}
String address = comp[0];
int port = Integer.parseInt(comp[1]);
RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte byteBuf[] = buffer.array();
byte[] resp = requestAndReceive(byteBuf, address, port);
ByteBuffer respBuffer = ByteBuffer.wrap(resp);
ResponseHeader.parse(respBuffer);
return CreateTopicsResponse.parse(respBuffer);
}
private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
try(Socket socket = new Socket(address, port);
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
DataInputStream dis = new DataInputStream(socket.getInputStream())
) {
dos.writeInt(buffer.length);
dos.write(buffer);
dos.flush();
byte resp[] = new byte[dis.readInt()];
dis.readFully(resp);
return resp;
} catch (IOException e) {
log.error(e);
}
return new byte[0];
}
, , .
4 CreateTopicsResponse errors, a Map<String, Errors>, key - , . , , , , , Errors.None, .