Scala vs Java Streaming: Scala does not print anything, Java works

I am doing a comparison between Scala vs Java Reactive Spec implementations using akka-stream and RxJava , respectively. My use case is simplified grep: this directory, file filter and search text, I look in this directory for all matching files with text. Then I pass the pair (filename → matching line). This works fine for Java, but nothing prints for Scala. There are no exceptions, but no conclusion. Test data is downloaded from the Internet, but as you can see, the code can be easily tested with any local directory.

Scala

object Transformer {
  implicit val system = ActorSystem("transformer")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext: ExecutionContext = {
    implicitly
  }

  import collection.JavaConverters._

  def run(path: String, text: String, fileFilter: String) = {
    Source.fromIterator { () =>
      Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala
    }.map(p => {
      val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
      (p, lines)
    })
      .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
  }
}

Java

public class Transformer {
    public static void run(String path, String text, String fileFilter) {
        Observable.from(files(path, fileFilter)).flatMap(p -> {
            try {
                return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p)
                        .filter(line -> line.contains(text))
                        .map(String::trim)
                        .collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet)));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue()));
    }

    private static Iterable<Path> files(String path, String fileFilter) {
        try {
            return Files.newDirectoryStream(Paths.get(path), fileFilter);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

Unit test with Scala Test:

class TransformerSpec extends FlatSpec with Matchers {
  "Transformer" should "extract temperature" in {
    Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml")
  }

  "Java Transformer" should "extract temperature" in {
    JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml")
  }
}
+4
1

Dang, , Source Future, , . @MrWiggles . Scala Java.

. DirectoryStream, java.io.IOException: Too many open files in system. .

def run(path: String, text: String, fileFilter: String) = {
  val files = Files.newDirectoryStream(Paths.get(path), fileFilter)

  val future = Source(files.asScala.toList).map(p => {
    val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
    (p, lines)
    })
    .filter(!_._2.isEmpty)
    .runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))

  Await.result(future, 10.seconds)

  files.close

  true // for testing
}
+1

Source: https://habr.com/ru/post/1627418/


All Articles