I am using Java.
I get filepath through kafka messages. And I need to load this file in spark RDD, process it and upload to HDFS.
I can get the file path from kafka message. And I want to create a Dataset / RDD above this file.
I cannot start the map function in the kafka message dataset. This is an error with NPE, since sparkContext is not available to the worker.
I cannot start foreach in the kafka message dataset. Error with the message "Requests with streaming sources must be performed using writeStream.start ();"
I can’t collect the data received from the kafka message dataset, because it fails to display the message "Requests with streaming sources must be done using writeStream.start () ;;"
I suppose this should be a very common use case and should work in a lot of settings.
How to load a file as RDD from the paths that I receive in a Kafka message?
CODE BELOW:
SparkSession spark = SparkSession.builder()
.appName("MyKafkaStreamReader")
.master("local[4]")
.config("spark.executor.memory", "2g")
.getOrCreate();
Dataset<String> kafkaValues = spark.readStream()
.format("kafka")
.option("spark.streaming.receiver.writeAheadLog.enable", true)
.option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER)
.option("subscribe", Configuration.KAFKA_TOPIC)
.option("fetchOffset.retryIntervalMs", 100)
.option("checkpointLocation", "file:///tmp/checkpoint")
.load()
.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());
Dataset<String> messages = kafkaValues.map(x -> {
ObjectMapper mapper = new ObjectMapper();
String m = mapper.readValue(x.getBytes(), String.class);
return m;
}, Encoders.STRING() );
Any idea on how I can read, create file paths and create RDDs over files?
source
share