Scala: Iterating over CSV files in a functional way?

I have comments CSV files that contain column names, where the columns change throughout the file:

#c1,c2,c3 a,b,c d,e,f #c4,c5 g,h i,j 

I want to provide a way to iterate over (only) rows of file data in the form of column name map names into values ​​(all rows). So the above will be:

 Map(c1 -> a, c2 -> b, c3 -> c) Map(c1 -> d, c2 -> e, c3 -> f) Map(c4 -> g, c5 -> h) Map(c4 -> i, c5 -> j) 

Files are very large, so reading everything into memory is not an option. Right now I have an Iterator class that saves some ugly state between hasNext() and next() ; I also provide an accessory for the current line number and the last last line and comment (in case consumers care about ordering locally). I would like to try to do something more functional.

My first idea was to understand: I can iterate over the lines of a file by skipping the lines of comments using a filter clause. I can yield tuple containing a map, line number, etc. The problem is that I need to remember the names of the last columns so that I can create Maps from them. For loops, it’s understandable to try to prevent the state from persisting, only allowing you to set new val s. I learned from this question that I can update member variables in a yield block, but that's for sure when I don't want to update them in my case!

I can call a function in an iteration clause that updates the state, but it seems dirty. So what is the best way to do this in a functional style? Abuse of understanding? Hack scanLeft ? Use a library? Bring out the parser-combinator of large guns? Or is a functional style just not suitable for this problem?

+6
source share
7 answers

State Monad FTW!

Actually, I suck in the state monad. I had a hell of a time writing this and I have a strong feeling that it can be done much better. In particular, it seems to me that traverse is the way to go, but ...

 // Get Scalaz on the job import scalaz._ import Scalaz._ // Some type aliases to make stuff clearer type Input = Stream[String] type Header = String type InternalState = (Input, Header) type Output = Option[(Header, String)] type MyState = State[InternalState, Output] // Detect headers def isHeader(line: String) = line(0) == '#' // From a state, produce an output def makeLine: (InternalState => Output) = { case (head #:: _, _) if isHeader(head) => None case (head #:: _, header) => Some(header -> head) case _ => None } // From a state, produce the next state def nextLine: (InternalState => InternalState) = { case (head #:: tail, _) if isHeader(head) => tail -> head case (_ #:: tail, header) => tail -> header case _ => Stream.empty -> "" } // My state is defined by the functions producing the next state // and the output val myState: MyState = state(s => nextLine(s) -> makeLine(s)) // Some input to test it. I'm trimming it to avoid problems on REPL val input = """#c1,c2,c3 a,b,c d,e,f #c4,c5 g,h i,j""".lines.map(_.trim).toStream // My State/Output Stream -- def to avoid keeping a reference to the head def stateOutputStream = Stream.iterate(myState(input, "")){ case (s, _) => myState(s) } takeWhile { case ((stream, _), output) => stream.nonEmpty || output.nonEmpty } // My Output Stream -- flatMap gets rid of the None from the headers def outputStream = stateOutputStream flatMap { case (_, output) => output } // Now just get the map def outputToMap: (Header, String) => Map[String, String] = { case (header, line) => val keys = header substring 1 split "," val values = line split "," keys zip values toMap } // And this is the result -- note that I'm still avoiding "val" so memory // won't leak def result = outputStream map outputToMap.tupled 
+5
source

Here you can do it with Iteratees. A stream is represented as a function from Iteratee to Iteratee, so it is never implemented in memory. I use the state monad to track the last title found.

 import scalaz._ import Scalaz._ import IterV._ type Header = List[String] type MyState[A] = State[Header, A] type Out = Map[String, String] // Detect headers def isHeader(line: String) = line(0) == '#' type Enumeratee[A, B, C] = IterV[B, C] => Iteratee[MyState, A, IterV[B, C]] // Enumerate a list. Just for demonstration. def enumerateM[M[_]: Monad, E, A]: (List[E], Iteratee[M, E, A]) => Iteratee[M, E, A] = { case (Nil, i) => i case (x :: xs, Iteratee(m)) => Iteratee(for { v <- m o <- v match { case d@DoneM (_, _) => d.pure[M] case ContM(k) => enumerateM.apply(xs, k(El(x))).value } } yield o) } def stateTrans[A]: Enumeratee[String, Map[String, String], A] = i => Iteratee(i.fold( done = (_, _) => DoneM(i, Empty.apply).pure[MyState], cont = k => ContM((x: Input[String]) => x match { case El(e) => Iteratee[MyState, String, IterV[Out, A]](for { h <- init o <- if (isHeader(e)) put(e substring 1 split "," toList) map (_ => Empty[Out]) else El((h zip (e split ",")).toMap).pure[MyState] v <- stateTrans(k(o)).value } yield v) case Empty() => stateTrans(k(Empty.apply)) case EOF() => stateTrans(k(EOF.apply)) }).pure[MyState] )) 

Carry out a test and take the head of the output stream:

 scala> (enumerateM[MyState, String, IterV[Out, Option[Out]]].apply( | List("#c1,c2,c3","a,b,c","d,e,f"), stateTrans(head)).value ! List()) | match { case DoneM(a, _) => a match { case Done(b, _) => b } } res0: Option[Out] = Some(Map(c1 -> a, c2 -> b, c3 -> c)) 

This can be done much better by diverting some of this material to auxiliary functions.

+3
source

Here is a possible solution:

First, look at the answer. Divide the list into each element that satisfies the Scala predicate that will give you the groupPrefix function. You get the groupPrefix method, which splits the list into a list of lists, separating events when an element satisfies a given predicate. Thus, you split, you have a list starting with each line of the comment (definition of columns) and containing the corresponding data subsequently

This procedure then converts one of the subscriptions (starting with column names) to a list of the corresponding map.

 import scala.collection.immutable.ListMap // to keep the order of the columns. If not needed, just use Map def toNamedFields(lines: List[String]) : List[Map[String, String]] = { val columns = lines.head.tail.split(",").toList // tail to discard the # lines.tail.map{line => ListMap(columns.zip(line.split(",")): _*)} } 

With this, you break your lines, get cards in each group, getting a list of a list of cards, which you turn into a single list using smoothing

 groupPrefix(lines){_.startsWith("#")}.map(toNamedFields).flatten 
+2
source

It may be more elegant, but you get the exercise:

  def read(lines: Iterator[String], currentHeadings: Option[Seq[String]] = None): Stream[Option[Map[String, String]]] = if (lines.hasNext) { val l = lines.next if (l.startsWith("#")) Stream.cons( None, read(lines, Some(l.tail.split(",")))) else Stream.cons( currentHeadings.map(_.zip(l.split(",")).toMap), read(lines, currentHeadings)) } else Stream.cons(None, Stream.Empty) def main(args: Array[String]): Unit = { val lines = scala.io.Source.fromFile("data.csv").getLines println(read(lines).flatten.toList) } 

Print

 List(Map(c1 -> a, c2 -> b, c3 -> c), Map(c1 -> d, c2 -> e, c3 -> f), Map(c4 -> g, c5 -> h), Map(c4 -> i, c5 -> j)) 
+2
source

Well here is Python ...

 from collections import namedtuple def read_shifty_csv(csv_file): cols = None for line in csv_file: line = line.strip() if line.startswith('#'): cols = namedtuple('cols', line[1:].split(',')) else: yield cols(*line.split(','))._asdict() 

Drop the _asdict () call if you would rather work with a tuple than a dict. Only materializes the line at a time in memory.

Edit to try to be a little more functional:

 from collections import namedtuple from itertools import imap def read_shifty_csv(csv_file): cols = None for line in imap(str.strip, csv_file): if line.startswith('#'): cols = namedtuple('cols', line[1:].split(',')) else: yield cols(*line.split(','))._asdict() 

Just dropped the evil line reassignment = line.strip ()

+2
source

Inspired by the appreciation of @schmichael from the Python functional solution, here is my attempt to push things too much. I do not claim that it is supported, efficient, illustrative or verified, but it is functional:

 from itertools import imap, groupby, izip, chain from collections import deque from operator import itemgetter, methodcaller from functools import partial def shifty_csv_dicts(lines): last = lambda seq: deque(seq, maxlen=1).pop() parse_header = lambda header: header[1:-1].split(',') parse_row = lambda row: row.rstrip('\n').split(',') mkdict = lambda keys, vals: dict(izip(keys,vals)) headers_then_rows = imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#'))) return chain.from_iterable(imap(partial(mkdict, parse_header(last(headers))), imap(parse_row, next(headers_then_rows))) for headers in headers_then_rows) 

Ok, let it unpack it.

The basic idea is to (ab) use itertools.groupby to recognize changes from headers to data rows. We use argument evaluation semantics to control the order of operations.

First we say groupby to group the lines, regardless of whether they start with '#' or not:

 methodcaller('startswith', '#') 

creates a function that takes a line and calls line.startswith('#') (it is equivalent to the stylistically preferred but less efficient lambda line: line.startswith('#') ).

Thus, groupby takes an incoming iterative of lines and alternates between returning an iteration of the header lines (usually only one header) and the iterated data line. It actually returns a tuple (group_val, group_iter) , where in this case group_val is a bool indicating whether this is a header. So, we execute the equivalent (group_val, group_iter)[1] on all tuples to select iterators: itemgetter(1) is just a function that runs " [1] " on everything you give it (again, equivalent, but more efficient than lambda t: t[1] ). So we use imap to run our function itemgetter each tuple returned groupby , iterators to select a title / data:

 imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#'))) 

We evaluate this expression first and give it a name because we will use it twice later, first for headers and then for data. The most external call:

 chain.from_iterable(... for headers in headers_then_rows) 

goes through iterators returned from groupby . We are tricky and call the headers value because some other code inside ... selects rows when we are not looking, promoting the groupby iterator in the process. This expression of the external generator will produce only headers (remember, they change: headers, data, headers, data ...). The trick is to make sure the headers are consumed before the lines, because they both have the same main iterator. chain.from_iterable simply stitches the results of all iterators of data rows into one iterator to return them to everyone.

So what do we sew together? Well, we need to take the (last) heading, button it with each row of values ​​and make dicts out of it. It:

 last = lambda seq: deque(seq, maxlen=1).pop() 

is a somewhat dirty but effective hack to get the last element from an iterator, in this case, our title bar. Then we parse the header by trimming the leading # and ending newline and dividing by, to get a list of column names:

 parse_header = lambda header: header[1:-1].split(',') 

But we want to do this only once for each row iterator, because it runs out of the header iterator (and now we don’t want to copy it to some kind of mutable state, right?). We also need to make sure that the header iterator is used before the lines. The solution is to make the function partially applied, evaluating and fixing the headers as the first parameter, and taking the string as the second parameter:

 partial(mkdict, parse_header(last(headers))) 

The mkdict function uses column names as keys and row data as values ​​to create a dict:

 mkdict = lambda keys, vals: dict(izip(keys,vals)) 

This gives us a function that freezes the first parameter ( keys ) and allows us to simply pass the second parameter ( vals ): exactly what we need to create a group of dictons with the same keys and different values.

To use it, we analyze each line as you expected:

 parse_row = lambda row: row.rstrip('\n').split(',') 

recalling that next(headers_then_rows) will return an iterator of data rows from groupby (since we already used a header iterator):

 imap(parse_row, next(headers_then_rows)) 

Finally, we map our partially applied dict-maker function to the parsed strings:

 imap(partial(...), imap(parse_row, next(headers_then_rows))) 

And all of them are sewn using chain.from_iterable , to make one, big, happy, functional flow sliding CSV-dictates.

For the record, this can probably be simplified, and I will still be doing @schmichael stuff. But I understood how to understand this, and I will try to apply these ideas to the Scala solution.

+1
source

EDIT: Scratch this, I don't think you need monads.

0
source

Source: https://habr.com/ru/post/897156/


All Articles