Kafka Object Serialization

I started playing with Kafka. I set up the zookeeper configuration and I was able to send and use String messages. Now I'm trying to pass an object (in java), but for some reason, when analyzing the message, the consumer has problems with the header. I tried several serialization options (using a decoder / encoder) and all returned the same header problems.

Here is my Manufacturer code:

Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); ProducerConfig config = new ProducerConfig(props); Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); try { producer.send(data); } finally { producer.close(); } 

And the consumer:

  Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic "test", to allow 4 threads to consume Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaMessageStream<EventDetails> stream: streams) { executor.submit(new Runnable() { public void run() { for(EventDetails event: stream) { System.err.println("********** Got message" + event.toString()); } } }); } 

and my Serializer:

 public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { public Message toMessage(EventDetails eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new SmileFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null; // TODO } } public EventDetails toEvent(Message message) { EventDetails event = new EventDetails(); ObjectMapper mapper = new ObjectMapper(new SmileFactory()); try { //TODO handle error return mapper.readValue(message.payload().array(), EventDetails.class); } catch (IOException e) { e.printStackTrace(); return null; } } } 

And this is the error I get:

 org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse at [Source: N/A; line: -1, column: -1] 

When I was working with MessagePack and with a simple recording on ObjectOutputStream , I got a similar problem with the header. I also tried adding the CRC32 payload to the message, but that didn't help either.

What am I doing wrong here?

+6
source share
2 answers

Hm, I did not encounter the same header problem you are facing, but my project did not compile correctly when I did not provide the VerifiableProperties constructor in my encoder / decoder. It seems odd that the missing constructor damaged Jackson's deserialization, though.

Perhaps try breaking your encoder and decoder and turning on the VerifiableProperties constructor in both; you do not need to implement Decoder[T] for serialization. I was able to successfully implement json de / serialization using ObjectMapper following the format of this post .

Good luck

+3
source

The Bytebuffers.array () method is not very reliable. It depends on the specific implementation. You might want to try

 ByteBuffer bb = message.payload() byte[] b = new byte[bb.remaining()] bb.get(b, 0, b.length); return mapper.readValue(b, EventDetails.class) 
+1
source

Source: https://habr.com/ru/post/920523/


All Articles