How to write a streaming dataset in Kafka?

I am trying to do some enrichment for these topics. Therefore, reading from Kafka returns to Kafka using spark structured streaming.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

But I get an exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Any workarounds?

+4
source share
2 answers

Like T. Gawęda , there is no kafka format for recording streaming datasets in Kafka (i.e. Kafka Sink).

The currently recommended solution in Spark 2.1 is to use the foreach statement .

foreach . Spark 2.1 Scala Java. , ForeachWriter (Scala/Java docs), , , , . .

+3

Spark 2.1 ( Spark) . - 2.2 - Kafka Writer, .

, .

+3

Source: https://habr.com/ru/post/1673096/


All Articles