I have successfully created a very simple Spark Streaming application in Java, based on the Scala HdfsCount example .
When I send this application to my local Spark, it expects the file to be written to this directory, and when I create this file, it will successfully print the word count. I end the application by pressing Ctrl + C.
Now I tried to create a very basic unit test for this function, but in the test I was not able to print the same information, i.e. the number of words.
What am I missing?
Below is the unit test file, after which I also included a code snippet that shows the countWords method:
StarterAppTest.java
import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext("local", "test", new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void tearDown() { ssc.stop(); ssc = null; } @Test public void testInitialization() { Assert.assertNotNull(ssc.sc()); } @Test public void testCountWords() { StarterApp starterApp = new StarterApp(); try { JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines); ssc.start(); File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt"); PrintWriter writer = new PrintWriter(tmpFile, "UTF-8"); writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin"); writer.close(); System.err.println("===== Word Counts ======="); wordCounts.print(); System.err.println("===== Word Counts ======="); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Assert.assertTrue(true); } }
This test compiles and runs, Spark Streaming prints a lot of diagnostic messages on the console, but the wordCounts.print() call does not print anything, and they do it in StarterApp.java itself.
I also tried adding ssc.awaitTermination(); after ssc.start() , but nothing has changed in this regard. After that, I also tried to manually create a new file in the directory that tested this Spark Streaming application, but this time it gave an error.
For completeness, below is the wordCounts method:
public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey((i1, i2) -> i1 + i2); return wordCounts; }