I'm new to scala and trying to crack the way to send serialized Java objects to the RabbitMQ queue in a Spark Streaming application.
I can successfully assign objects that have been serialized with ObjectOutputStream. To get my objects at the end of Spark, I downloaded the custom implementation of RabbitMQ InputDStreamand Receiverfrom here - https://github.com/Stratio/rabbitmq-receiver
However, in my understanding, codebase only supports messages String, not binary. So I started hacking this code so that it supported the ability to read a binary message and store it as an array of bytes so that I could deserialize it at the end of Spark. This attempt is here - https://github.com/llevar/rabbitmq-receiver
Then I have the following code in my Spark driver program:
val conf = new SparkConf().setMaster("local[6]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val receiverStream: ReceiverInputDStream[scala.reflect.ClassTag[AnyRef]] =
RabbitMQUtils.createStreamFromAQueue(ssc,
"localhost",
5672,
"mappingQueue",
StorageLevel.MEMORY_AND_DISK_SER_2)
val parsedStream = receiverStream.map{ m =>
SerializationUtils.deserialize(m.asInstanceOf[Array[Byte]]).asInstanceOf[SAMRecord]
}
parsedStream.print()
ssc.start()
Unfortunately this does not work. Data is spent from the queue. I am not getting any errors, but I am not getting any result that I expect.
That is all I get.
2015-07-24 23:33:38 WARN BlockManager: 71 - - 0-1437795218845, 0 peer (s) 1 peers 2015-07-24 23:33:38 WARN BlockManager: 71 - - 0-1437795218846, 0 peer (s) 1 peers 2015-07-24 23:33:38 WARN BlockManager: 71 - Block input-0-1437795218847 0 peer (s) 1 peers 2015-07-24 23:33:38 WARN BlockManager: 71 - Block input-0-1437795218848 0 peer (s) 1 peers
store() - https://github.com/llevar/rabbitmq-receiver/blob/master/src/main/scala/com/stratio/receiver/RabbitMQInputDStream.scala#L106
SerializationUtils delivery.getBody, , , DStream .
.