You can use the HDFS streaming file using the ssc method
val ssc = new StreamingContext (sparkConf, Seconds (batchTime))
val dStream = ssc.fileStream [LongWritable, Text, TextInputFormat] (streamDirectory, (x: path) => true, newFilesOnly = false)
Usage above api param filter Function to filter paths for processing.
If your condition does not match the path / file name and based on the data, you need to stop the streaming context if the condition satisfies.
To do this, you need to use a stream implementation, 1) In one stream, you must continue to check that the stream context is stopped, and if ssc is stopped, then tell another stream to wait and create a new stream context.
2) In the second thread, you need to check the condition, and if the condition is satisfied, stop the streaming context.
Please let me know if you need an explanation.
source share