How to access file paths in records from Kafka and create datasets?

I am using Java.

I get filepath through kafka messages. And I need to load this file in spark RDD, process it and upload to HDFS.

I can get the file path from kafka message. And I want to create a Dataset / RDD above this file.

I cannot start the map function in the kafka message dataset. This is an error with NPE, since sparkContext is not available to the worker.

I cannot start foreach in the kafka message dataset. Error with the message "Requests with streaming sources must be performed using writeStream.start ();"

I can’t collect the data received from the kafka message dataset, because it fails to display the message "Requests with streaming sources must be done using writeStream.start () ;;"

I suppose this should be a very common use case and should work in a lot of settings.

How to load a file as RDD from the paths that I receive in a Kafka message?

CODE BELOW:

SparkSession spark = SparkSession.builder()
.appName("MyKafkaStreamReader")
    .master("local[4]")
.config("spark.executor.memory", "2g")
.getOrCreate();

// Create DataSet representing the stream of input lines from kafka
Dataset<String> kafkaValues = spark.readStream()
.format("kafka")
    .option("spark.streaming.receiver.writeAheadLog.enable", true)
    .option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER)
    .option("subscribe", Configuration.KAFKA_TOPIC)
    .option("fetchOffset.retryIntervalMs", 100)
    .option("checkpointLocation", "file:///tmp/checkpoint")
.load()
    .selectExpr("CAST(value AS STRING)").as(Encoders.STRING());

Dataset<String> messages = kafkaValues.map(x -> {
  ObjectMapper mapper = new ObjectMapper();
  String m = mapper.readValue(x.getBytes(), String.class);
  return m;
}, Encoders.STRING() );

// ====================
// TEST 1 : FAILS
// ====================    
// CODE TRYING TO execute MAP on the received RDD 
// This fails with a Null pointer exception because "spark" is not available on worker node

/*
Dataset<String> statusRDD = messages.map(message -> {

  // BELOW STATEMENT FAILS
  Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message); 
  Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
  dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
  return getHdfsLocation();

}, Encoders.STRING());

  StreamingQuery query2 = statusRDD.writeStream().outputMode("append").format("console").start();
  */

// ====================    
// TEST 2 : FAILS
// ====================    
// CODE BELOW FAILS WITH EXCEPTION 
// "Queries with streaming sources must be executed with writeStream.start();;"
// Hence, processing the deduplication on the worker side using
/*
JavaRDD<String> messageRDD = messages.toJavaRDD();

messageRDD.foreach( message -> {

  Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message);
  Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
  dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());

});
*/

// ====================    
// TEST 3 : FAILS
// ====================
// CODE TRYING TO COLLECT ALSO FAILS WITH EXCEPTION
// "Queries with streaming sources must be executed with writeStream.start();;"
// List<String> mess = messages.collectAsList();

Any idea on how I can read, create file paths and create RDDs over files?

+4
source share
1 answer

In Structured Streaming, I don’t think there is a way to validate the data in one stream, which will be used as a parameter for the data set operation.

Spark , Spark Streaming Spark SQL (Datasets). Spark Streaming Kafka, , Spark SQL, .

: ( Scala, Java . , )

// configure and create spark Session

val spark = SparkSession
    .builder
    .config(...)
    .getOrCreate()

// create streaming context with a 30-second interval - adjust as required
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(30))

// this uses Kafka080 client. Kafka010 has some subscription differences

val kafkaParams = Map[String, String](
  "metadata.broker.list" -> kafkaBootstrapServer,
  "group.id" -> "job-group-id",
  "auto.offset.reset" -> "largest",
  "enable.auto.commit" -> (false: java.lang.Boolean).toString
)

// create a kafka direct stream
val topics = Set("topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     streamingContext, kafkaParams, topics)

// extract the values from the kafka message
val dataStream = stream.map{case (id, data) => data}     

// process the data
dataStream.foreachRDD { dataRDD => 
  // get all data received in the current interval
  // We are assuming that this data fits in memory. 
  // We're not processing a million files per second, are we?
  val files = dataRDD.collect()
  files.foreach{ file => 
    // this is the process proposed in the question --
    // notice how we have access to the spark session in the context of the foreachRDD
    val fileDataset = spark.read().option("header", "true").csv(file) 
    val dedupedFileDataset = fileDataset.dropDuplicates()
    // this can probably be written in terms of the dataset api
    //dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
    dedupedFileDataset.write.format("text").mode("overwrite").save(getHdfsLocation())
  }
}

// start the streaming process
streamingContext.start()
streamingContext.awaitTermination()
0

Source: https://habr.com/ru/post/1686398/


All Articles