So in fact, someone in my company decided this for me, so I will post it here for future readers.
basically what i skipped on top of what miguno suggested is part of decoding:
def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = { val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl) iter.map(message => { val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record] val field1 = record.get("field1Name").asInstanceOf[GenericData.Record] val field2 = record.get("field1Name").asInstanceOf[GenericData.String] ...
Now you can read messages from kafka and decode them like this:
val ds = spark .readStream .format(config.getString(ConfigUtil.inputFormat)) .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers)) .option("subscribe", config.getString(ConfigUtil.subscribeTopic)) .load() .as[KafkaMessage] val decodedDs = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))
* KafkaMessage is just a case class that contains the shared object that you get when reading from Kafka (key,value,topic,partition,offset,timestamp)
AvroTo<YourObject>Decoder is a class that will decode your object based on the URL of the schema registry.
For example, using the Confluent KafkaAvroDeserializer and registry schemas.
val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl) val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20) // If you have Avro encoded keys val keyDeserializer = new KafkaAvroDeserializer(client) keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true // Avro encoded values valueDeserializer = new KafkaAvroDeserializer(client) valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false
Of these, call .deserialize(topicName, bytes).asInstanceOf[GenericRecord] to get the avro object.
Hope this helps someone