Reading Avro Messages from Kafka with Spark 2.0.2 (Structured Streaming)

I have a spark 2.0 application that reads messages from kafka using a spark stream (with spark stream-kafka-0-10_2.11).

Structured streaming looks really cool, so I want to try and migrate the code, but I cannot figure out how to use it.

in regular streaming, I used kafkaUtils for createDstrean, and in the parameters I passed, it was a deserializer of the value.

in Structured streaming, the document says that I have to deserialize using DataFrame functions, but I can’t determine exactly what that means.

I looked at examples like the example , but my Avro object in Kafka is no longer complex and cannot be easily distinguished as String as an example ..

So far I have tried code like this (which I saw here in another question):

import spark.implicits._ val ds1 = spark.readStream.format("kafka"). option("kafka.bootstrap.servers","localhost:9092"). option("subscribe","RED-test-tal4").load() ds1.printSchema() ds1.select("value").printSchema() val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show() val query = ds2.writeStream .outputMode("append") .format("console") .start() 

and I get a "data type mismatch: cannot use BinaryType for StructType (StructField (..."

how can i deserialize the value?

+5
source share
4 answers

I'm still not very familiar with how Spark serialization works in conjunction with the new / experimental Structured Streaming, but the approach below works - although I'm not sure if this is the best way (IMHO the approach is somewhat inconvenient to see 'n feel).

I will try to answer your question using a custom data type as an example (here: the Foo case class), and not specifically for Avro, but I hope this helps you anyway. The idea is to use Kryo serialization for custom type serialization / deserialization, see Configuring: serializing data in the Spark documentation.

Note. Spark supports serializing case classes out of the box with built-in (implicit) encoders, which you can import through import spark.implicits._ . But let's not ignore this functionality for the sake of this example.

Imagine that you defined the following class of the Foo class as your own type (TL; DR hint): to prevent it from starting into strange complaints / errors in Spark spike, you should put the code in a separate Foo.scala file):

 // This could also be your auto-generated Avro class/type case class Foo(s: String) 

Now you have the following structured stream code for reading data from Kafka, where the input topic contains Kafka messages whose message value is a binary-encoded String , and your goal is to create Foo instances from these (for example, similar to how to deserialize binary data in instances of the Avro class):

 val messages: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "my-input-topic") .load() 

Now we deserialize the values ​​in the instances of our custom type Foo , for which we first need to define the implicit Encoder[Foo] :

 implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo] val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value"))) 

Returning to your Avro question, you will need:

  • Create the right Encoder for your needs.
  • Replace Foo(new String(row.getAs[Array[Byte]]("value")) code to deserialize the Avro binary data into Avro POJO, that is, the code that outputs the Avro binary data from the message value ( row.getAs[Array[Byte]]("value") ) and returns, say, Avro GenericRecord or any other SpecificCustomAvroObject that you defined elsewhere.

If someone else knows a shorter / better / ... way to answer Tal's question, I'm all ears. :-)

See also:

+2
source

As noted above, with Spark 2.1.0, support for avro is supported with a batch reader, but not with SparkSession.readStream (). This is how I got it to work in Scala based on other answers. I simplified the circuit for brevity.

 package com.sevone.sparkscala.mypackage import org.apache.spark.sql._ import org.apache.avro.io.DecoderFactory import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumReader, GenericRecord} object MyMain { // Create avro schema and reader case class KafkaMessage ( deviceId: Int, deviceName: String ) val schemaString = """{ "fields": [ { "name": "deviceId", "type": "int"}, { "name": "deviceName", "type": "string"}, ], "name": "kafkamsg", "type": "record" }"""" val messageSchema = new Schema.Parser().parse(schemaString) val reader = new GenericDatumReader[GenericRecord](messageSchema) // Factory to deserialize binary avro data val avroDecoderFactory = DecoderFactory.get() // Register implicit encoder for map operation implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord] def main(args: Array[String]) { val KafkaBroker = args(0); val InTopic = args(1); val OutTopic = args(2); // Get Spark session val session = SparkSession .builder .master("local[*]") .appName("myapp") .getOrCreate() // Load streaming data import session.implicits._ val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .map(d => { val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null)) val deviceId = rec.get("deviceId").asInstanceOf[Int] val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString new KafkaMessage(deviceId, deviceName) }) 
+2
source

So in fact, someone in my company decided this for me, so I will post it here for future readers.

basically what i skipped on top of what miguno suggested is part of decoding:

 def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = { val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl) iter.map(message => { val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record] val field1 = record.get("field1Name").asInstanceOf[GenericData.Record] val field2 = record.get("field1Name").asInstanceOf[GenericData.String] ... //create an object with the fields extracted from genericRecord }) } 

Now you can read messages from kafka and decode them like this:

 val ds = spark .readStream .format(config.getString(ConfigUtil.inputFormat)) .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers)) .option("subscribe", config.getString(ConfigUtil.subscribeTopic)) .load() .as[KafkaMessage] val decodedDs = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl)) 

* KafkaMessage is just a case class that contains the shared object that you get when reading from Kafka (key,value,topic,partition,offset,timestamp)

AvroTo<YourObject>Decoder is a class that will decode your object based on the URL of the schema registry.

For example, using the Confluent KafkaAvroDeserializer and registry schemas.

 val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl) val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20) // If you have Avro encoded keys val keyDeserializer = new KafkaAvroDeserializer(client) keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true // Avro encoded values valueDeserializer = new KafkaAvroDeserializer(client) valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false 

Of these, call .deserialize(topicName, bytes).asInstanceOf[GenericRecord] to get the avro object.

Hope this helps someone

+1
source

Use the following steps:

  • Identify Kafka's message.
  • Define a consumer utility that returns the DataSet of your AvroObject.
  • Define your logical code.

Kafka post:

 case class KafkaMessage(key: String, value: Array[Byte], topic: String, partition: String, offset: Long, timestamp: Timestamp) 

Kafka Consumer:

 import java.util.Collections import com.typesafe.config.{Config, ConfigFactory} import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.spark.sql.SparkSession import scala.reflect.runtime.universe._ object KafkaAvroConsumer { private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer") val valueDeserializer = new KafkaAvroDeserializer() valueDeserializer.configure(Collections.singletonMap("schema.registry.url", conf.getString("schema.registry.url")), false) def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = { val schema = new Schema.Parser().parse(schemaStr) Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value)) } def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag] (schemaStr: String) (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = { val spark = SparkSession .builder .master("local[*]") .appName(appName) .getOrCreate() import spark.implicits._ // Create DataSet representing the stream of KafkaMessage from kafka val ds = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")) .option(subscribeType, topics) .option("startingOffsets", "earliest") .load() .as[KafkaMessage] .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object. ds } } 

Update

Utils:

 import org.apache.avro.Schema import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.specific.SpecificData import scala.reflect.runtime.universe._ object Utils { def convert[T <: GenericRecord: TypeTag](targetSchema: Schema)(record: AnyRef): T = { SpecificData.get.deepCopy(targetSchema, record).asInstanceOf[T] } } 
0
source

Source: https://habr.com/ru/post/1260036/


All Articles