Enumerated / stream look

I am starting to study the elixir and am facing a problem that I could not easily solve.

I am trying to create a function that takes Enumerable.t and returns another Enumerable.t that includes the following n elements. It would have a slightly different behavior from Enum.chunk (e, n, 1, []) in that the number of iterations of the number will always be equal to the original enumerated account. I also need to support Streams

@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t 

This is best illustrated by the doctest syntax:

 iex> lookahead(1..6, 1) |> Enum.to_list [[1,2],[2,3],[3,4],[4,5],[5,6],[6]] iex> lookahead(1..4, 2) |> Enum.to_list [[1,2,3],[2,3,4],[3,4],[4]] iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5) [[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]] iex> {:ok,io} = StringIO.open("abcd") iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list [["a","b","c"],["b","c","d"],["c","d"],["d"]] 

I investigated the implementation of the Enumerable.t protocol, but did not quite understand the Enumerable.reduce interface.

Is there a concise / elegant way to do this?

My use case is a small fixed value of n (1 or 2) in a binary stream, so there are additional points for the optimized version. However, to learn Elixir, I am interested in a solution in several use cases. Performance is important. I will run some tests on various n values ​​to solve and publish.

Benchmark Update - April 8, 2015

6 working solutions have been published. Detailed test information is available at https://gist.github.com/spitsw/fce5304ec6941578e454 . Benchmarks were performed on a list with 500 items for various values ​​of n.

For n = 1, the following result:

 PatrickSuspend.lookahead 104.90 µs/op Warren.lookahead 174.00 µs/op PatrickChunk.lookahead 310.60 µs/op PatrickTransform.lookahead 357.00 µs/op Jose.lookahead 647.60 µs/op PatrickUnfold.lookahead 1484000.00 µs/op 

For n = 50, the following result:

 PatrickSuspend.lookahead 220.80 µs/op Warren.lookahead 320.60 µs/op PatrickTransform.lookahead 518.60 µs/op Jose.lookahead 1390.00 µs/op PatrickChunk.lookahead 3058.00 µs/op PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1) 
+6
source share
6 answers

As discussed in the comments, my first attempt had some performance issues and did not work with threads that have side effects, such as I / O threads. I took the time to delve deeper into the thread library and finally came up with this solution:

 defmodule MyStream def lookahead(enum, n) do step = fn val, _acc -> {:suspend, val} end next = &Enumerable.reduce(enum, &1, step) &do_lookahead(n, :buffer, [], next, &1, &2) end # stream suspended defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do {:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)} end # stream halted defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do {:halted, acc} end # initial buffering defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> new_state = if length(buf) < n, do: :buffer, else: :emit do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun) {_, _} -> do_lookahead(n, :emit, buf, next, {:cont, acc}, fun) end end # emitting defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun) {_, _} -> do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun) end end # buffer empty, halting defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do {:halted, acc} end end 

It might look intimidating at first, but it's actually not that difficult. I will try to break it down for you, but it is difficult with a full-blown example like this.

Let's start with a simpler example: a stream that endlessly repeats the value given to it. To emit a stream, we can return a function that takes a battery and a function as an argument. To emit a value, we call a function with two arguments: the value to emit and the battery. acc battery is a tuple consisting of a command ( :cont :suspend or :halt ) and tells us what the consumer wants to do from us; the result we need to return depends on the operation. If the thread should be suspended, we return a three-element atomic tuple :suspended , the accumulator, and a function that will be called when the enumeration continues (sometimes called a continuation). For the :halt command :halt we simply return {:halted, acc} , and for :cont we emit a value by performing a recursive step, as described above. It all looks like this:

 defmodule MyStream do def repeat(val) do &do_repeat(val, &1, &2) end defp do_repeat(val, {:suspend, acc}, fun) do {:suspended, acc, &do_repeat(val, &1, fun)} end defp do_repeat(_val, {:halt, acc}, _fun) do {:halted, acc} end defp do_repeat(val, {:cont, acc}, fun) do do_repeat(val, fun.(val, acc), fun) end end 

Now this is only one piece of the puzzle. We can emit a stream, but we are not yet processing the incoming stream. Again, to explain how this works, it makes sense to build a simpler example. Here I will build a function that enumerates and simply pauses and re-radiates for each value.

 defmodule MyStream do def passthrough(enum) do step = fn val, _acc -> {:suspend, val} end next = &Enumerable.reduce(enum, &1, step) &do_passthrough(next, &1, &2) end defp do_passthrough(next, {:suspend, acc}, fun) do {:suspended, acc, &do_passthrough(next, &1, fun)} end defp do_passthrough(_next, {:halt, acc}, _fun) do {:halted, acc} end defp do_passthrough(next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> do_passthrough(next, fun.(val, acc), fun) {_, _} -> {:halted, acc} end end end 

The first sentence sets up the next function, which is passed to the do_passthrough function. It serves to get the next value from the input stream. The internal step function used determines what we pause for each element in the stream. The rest is pretty similar, with the exception of the last sentence. Here we call the following function with {:cont, []} to get the new value and process the result using the case statement. If there is a value, we return back {:suspended, val, next} , if not, the flow stops, and we pass it to the consumer.

I hope this clarifies a few things about how to create threads in Elixir manually. Unfortunately, working with streams requires a lot of templates. If you return to the lookahead implementation now, you will see that there are only tiny differences that are really interesting parts. There are two additional parameters: state , which serves to distinguish between the steps :buffer and :emit and buffer , which are pre-filled with n+1 elements at the initial stage of buffering. In the emitting phase, the current buffer is emitted and then shifted to the left at each iteration. We are done when the input stream stops or our stream stops directly.


I leave my original answer here for reference:

Here's a solution that Stream.unfold/2 uses to emit a true stream of values ​​as per your description. This means that you need to add Enum.to_list at the end of your first two examples to get the actual values.

 defmodule MyStream do def lookahead(stream, n) do Stream.unfold split(stream, n+1), fn {[], stream} -> nil {[_ | buf] = current, stream} -> {value, stream} = split(stream, 1) {current, {buf ++ value, stream}} end end defp split(stream, n) do {Enum.take(stream, n), Stream.drop(stream, n)} end end 

The general idea is that we keep buf of previous iterations. At each iteration, we emit the current buf, take one value from the stream, and add it to the end of buf. This is repeated until buf is empty.

Example:

 iex> MyStream.lookahead(1..6, 1) |> Enum.to_list [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] iex> MyStream.lookahead(1..4, 2) |> Enum.to_list [[1, 2, 3], [2, 3, 4], [3, 4], [4]] iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5) [[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]] 
+5
source

Here is an ineffective implementation of such a function:

 defmodule Lookahead do def lookahead(enumerable, n) when n > 0 do enumerable |> Stream.chunk(n + 1, 1, []) |> Stream.flat_map(fn list -> length = length(list) if length < n + 1 do [list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)] else [list] end end) end end 

It is built on top of the @ hahuang65 implementation, except that we use Stream.flat_map/2 to check the length of each emitted element, adding the missing ones as soon as we find out that the emitted element has become shorter.

A handwritten implementation from scratch will be faster because we do not need to call length(list) on each iteration. The above implementation may be good, although if n is small. If n is fixed, you can even explicitly match the pattern in the generated list.

+3
source

I started a discussion of my proposed Stream.mutate method on the elixir core mailing list , where Peter Hamilton suggested another way to solve this problem. Using make_ref to create a globally unique link , we can create a stream of additions and combine it with the original enumeration to continue emitting after the original stream stopped. This can then be used in conjunction with Stream.chunk , which means that we need to remove unwanted links in the last step:

 def lookahead(enum, n) do stop = make_ref enum |> Stream.concat(List.duplicate(stop, n)) |> Stream.chunk(n+1, 1) |> Stream.map(&Enum.reject(&1, fn x -> x == stop end)) end 

I think this is the finest solution, from a syntactical point of view. As an alternative, we can use Stream.transform to create a buffer manually, which is very similar to the previously proposed manual solution:

 def lookahead(enum, n) do stop = make_ref enum |> Stream.concat(List.duplicate(stop, n+1)) |> Stream.transform([], fn val, acc -> case {val, acc} do {^stop, []} -> {[] , [] } {^stop, [_|rest] = buf} -> {[buf], rest } {val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] } {val , [_|rest] = buf} -> {[buf], rest ++ [val]} end end) end 

I have not tested these solutions, but, I believe, the second, although a little clunkier, should improve a bit, because it does not need to sort through each piece.

By the way, the second solution can be written without a case statement as soon as Elixir allows you to use the pin operator in function heads (possibly in version 1.1) :

 def lookahead(enum, n) do stop = make_ref enum |> Stream.concat(List.duplicate(stop, n+1)) |> Stream.transform([], fn ^stop, [] -> {[] , [] } ^stop, [_|rest] = buf -> {[buf], rest } val , buf when length(buf) < n+1 -> {[] , buf ++ [val] } val , [_|rest] = buf -> {[buf], rest ++ [val]} end) end 
+3
source

You can use Stream.chunk / 4

It will look something like this:

 defmodule MyMod do def lookahead(enum, amount) do Stream.chunk(enum, amount + 1, 1, []) end end 

With your inputs:

 iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list [[1, 2, 3], [2, 3, 4], [3, 4]] iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5) [[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]] 
+1
source

The solution below uses Stream.resource and the suspend function for Enumerable.reduce. All examples pass.

In short, it uses Enumerable.reduce to create a list. He then pauses the reducer at each iteration, removing the head of the list and adding a new element to the tail of the list. Finally, it creates a stream trailer when the reducer is: done or: stopped. All this is coordinated using Stream.resource.

This would be more efficient if the FIFO queue was used instead of the queue for each iteration.

Please provide feedback on any simplifications, effectiveness or errors.

 def Module def lookahead(enum, n) when n >= 0 do reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit end) end Stream.resource(reducer, fn {:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})} {:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items {:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items end, fn {:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend _ -> end) end defp lookahead_trail(n, acc) do case acc do {action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here {action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item acc -> {:halt, acc } # Finish of the stream end end end 
+1
source

After drawing inspiration from Warren, I did it. Main use:

 ex> {peek, enum} = StreamSplit.peek 1..10, 3 {[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>} iex> Enum.take(enum, 5) [1, 2, 3, 4, 5] 

https://hex.pm/packages/stream_split

0
source

Source: https://habr.com/ru/post/983996/


All Articles