I started playing with Kafka. I set up the zookeeper configuration and I was able to send and use String messages. Now I'm trying to pass an object (in java), but for some reason, when analyzing the message, the consumer has problems with the header. I tried several serialization options (using a decoder / encoder) and all returned the same header problems.
Here is my Manufacturer code:
Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); ProducerConfig config = new ProducerConfig(props); Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); try { producer.send(data); } finally { producer.close(); }
And the consumer:
Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic "test", to allow 4 threads to consume Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaMessageStream<EventDetails> stream: streams) { executor.submit(new Runnable() { public void run() { for(EventDetails event: stream) { System.err.println("********** Got message" + event.toString()); } } }); }
and my Serializer:
public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { public Message toMessage(EventDetails eventDetails) { try { ObjectMapper mapper = new ObjectMapper(new SmileFactory()); byte[] serialized = mapper.writeValueAsBytes(eventDetails); return new Message(serialized); } catch (IOException e) { e.printStackTrace(); return null;
And this is the error I get:
org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse at [Source: N/A; line: -1, column: -1]
When I was working with MessagePack
and with a simple recording on ObjectOutputStream
, I got a similar problem with the header. I also tried adding the CRC32 payload to the message, but that didn't help either.
What am I doing wrong here?