How to make Spark Streaming count words in a file in unit test?

I have successfully created a very simple Spark Streaming application in Java, based on the Scala HdfsCount example .

When I send this application to my local Spark, it expects the file to be written to this directory, and when I create this file, it will successfully print the word count. I end the application by pressing Ctrl + C.

Now I tried to create a very basic unit test for this function, but in the test I was not able to print the same information, i.e. the number of words.

What am I missing?

Below is the unit test file, after which I also included a code snippet that shows the countWords method:

StarterAppTest.java

import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext("local", "test", new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void tearDown() { ssc.stop(); ssc = null; } @Test public void testInitialization() { Assert.assertNotNull(ssc.sc()); } @Test public void testCountWords() { StarterApp starterApp = new StarterApp(); try { JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines); ssc.start(); File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt"); PrintWriter writer = new PrintWriter(tmpFile, "UTF-8"); writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin"); writer.close(); System.err.println("===== Word Counts ======="); wordCounts.print(); System.err.println("===== Word Counts ======="); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Assert.assertTrue(true); } } 

This test compiles and runs, Spark Streaming prints a lot of diagnostic messages on the console, but the wordCounts.print() call does not print anything, and they do it in StarterApp.java itself.

I also tried adding ssc.awaitTermination(); after ssc.start() , but nothing has changed in this regard. After that, I also tried to manually create a new file in the directory that tested this Spark Streaming application, but this time it gave an error.

For completeness, below is the wordCounts method:

 public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey((i1, i2) -> i1 + i2); return wordCounts; } 
+5
source share
1 answer

A few pointers:

  • Give at least 2 cores a SparkStreaming context. 1 for streaming and 1 for spark processing. "local" β†’ "local [2]"
  • The interval between threads is 3000 ms, so somewhere in your program you need to wait - at least this is the time to wait for the exit.
  • Spark Streaming takes some time to configure listeners. The file is created immediately after the release of ssc.start . There is no guarantee that the file system listener is already in place. I would do sleep(xx) after ssc.start

In streaming mode, it's all about the right time.

+2
source

Source: https://habr.com/ru/post/1208567/


All Articles