How to deserialize records from Kafka using Structured Streaming in Java?

I am using Spark 2.1 .

I try to read records from Kafka using Spark Structured Streaming, deserialize them and apply clusters after that.

I have the following code:

    SparkSession spark = SparkSession
            .builder()
            .appName("Statistics")
            .getOrCreate();

    Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load();

    df.selectExpr("CAST(value AS STRING)")

I want to deserialize the field valueinto my object, not as quality String.

I have a special deserializer for this.

public StatisticsRecord deserialize(String s, byte[] bytes)

How to do it in Java?


The only relevant link I found is https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html , but this is for Scala.

+4
2

JSON.

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

, . MessageData - JavaBean JSON.

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  
+3

Java , , Kafka load.

df.select("value")

Dataset<Row> value.


Spark API Scala, Scala "":

import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")

, ... Scala. Java - :)

, Spark SQL .

+2

Source: https://habr.com/ru/post/1676990/


All Articles