Reading CSV files using Akka streams

I am reading a csv file. I use Akka threads for this to create an action schedule for each line. I have the following game example and it works.

def main(args: Array[String]): Unit = { implicit val system = ActorSystem("MyAkkaSystem") implicit val materializer = ActorMaterializer() val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines) val sink = Sink.foreach(println) source.runWith(sink) } 

Two types of Source do not sit quietly with me. Is this idiomatic or is there a better way to write this?

+6
source share
3 answers

In fact, akka-streams provides a direct read function from a file.

 FileIO.fromPath(Paths.get("a.csv")) .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) .runForeach(println) 

Here, the runForeach method is for printing lines. If you have the correct Sink to handle these strings, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

 val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size)) FileIO.fromPath(Paths.get("a.csv")) .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)) .to(sink) .run() 
+11
source

An idiomatic way to read a CSV file with Akka streams is to use the Alpakka CSV connector . The following example reads a CSV file, converts it to a column name map (assuming it's the first line in the file) and ByteString values, converts the ByteString values ​​to String values ​​and prints each line:

 FileIO.fromPath(Paths.get("a.csv")) .via(CsvParsing.lineScanner()) .via(CsvToMap.toMap()) .map(_.mapValues(_.utf8String)) .runForeach(println) 
+5
source

Yes, this is normal, because these are different Source s. But if you don't like scala.io.Source , you can read the file yourself (which sometimes we need to do, for example, the source csv file is encrypted), and then parse it using a given InputStream like this

 StreamConverters.fromInputStream(() => input) .via(Framing.delimiter(ByteString("\n"), 4096)) .map(_.utf8String) .collect { line => line } 

Having said that, consider using Apache Commons CSV with akka thread. You can write less code :)

+2
source

Source: https://habr.com/ru/post/1011664/


All Articles