As discussed in the comments, my first attempt had some performance issues and did not work with threads that have side effects, such as I / O threads. I took the time to delve deeper into the thread library and finally came up with this solution:
defmodule MyStream def lookahead(enum, n) do step = fn val, _acc -> {:suspend, val} end next = &Enumerable.reduce(enum, &1, step) &do_lookahead(n, :buffer, [], next, &1, &2) end # stream suspended defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do {:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)} end # stream halted defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do {:halted, acc} end # initial buffering defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> new_state = if length(buf) < n, do: :buffer, else: :emit do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun) {_, _} -> do_lookahead(n, :emit, buf, next, {:cont, acc}, fun) end end # emitting defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun) {_, _} -> do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun) end end # buffer empty, halting defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do {:halted, acc} end end
It might look intimidating at first, but it's actually not that difficult. I will try to break it down for you, but it is difficult with a full-blown example like this.
Let's start with a simpler example: a stream that endlessly repeats the value given to it. To emit a stream, we can return a function that takes a battery and a function as an argument. To emit a value, we call a function with two arguments: the value to emit and the battery. acc battery is a tuple consisting of a command ( :cont :suspend or :halt ) and tells us what the consumer wants to do from us; the result we need to return depends on the operation. If the thread should be suspended, we return a three-element atomic tuple :suspended , the accumulator, and a function that will be called when the enumeration continues (sometimes called a continuation). For the :halt command :halt we simply return {:halted, acc} , and for :cont we emit a value by performing a recursive step, as described above. It all looks like this:
defmodule MyStream do def repeat(val) do &do_repeat(val, &1, &2) end defp do_repeat(val, {:suspend, acc}, fun) do {:suspended, acc, &do_repeat(val, &1, fun)} end defp do_repeat(_val, {:halt, acc}, _fun) do {:halted, acc} end defp do_repeat(val, {:cont, acc}, fun) do do_repeat(val, fun.(val, acc), fun) end end
Now this is only one piece of the puzzle. We can emit a stream, but we are not yet processing the incoming stream. Again, to explain how this works, it makes sense to build a simpler example. Here I will build a function that enumerates and simply pauses and re-radiates for each value.
defmodule MyStream do def passthrough(enum) do step = fn val, _acc -> {:suspend, val} end next = &Enumerable.reduce(enum, &1, step) &do_passthrough(next, &1, &2) end defp do_passthrough(next, {:suspend, acc}, fun) do {:suspended, acc, &do_passthrough(next, &1, fun)} end defp do_passthrough(_next, {:halt, acc}, _fun) do {:halted, acc} end defp do_passthrough(next, {:cont, acc}, fun) do case next.({:cont, []}) do {:suspended, val, next} -> do_passthrough(next, fun.(val, acc), fun) {_, _} -> {:halted, acc} end end end
The first sentence sets up the next function, which is passed to the do_passthrough function. It serves to get the next value from the input stream. The internal step function used determines what we pause for each element in the stream. The rest is pretty similar, with the exception of the last sentence. Here we call the following function with {:cont, []} to get the new value and process the result using the case statement. If there is a value, we return back {:suspended, val, next} , if not, the flow stops, and we pass it to the consumer.
I hope this clarifies a few things about how to create threads in Elixir manually. Unfortunately, working with streams requires a lot of templates. If you return to the lookahead implementation now, you will see that there are only tiny differences that are really interesting parts. There are two additional parameters: state , which serves to distinguish between the steps :buffer and :emit and buffer , which are pre-filled with n+1 elements at the initial stage of buffering. In the emitting phase, the current buffer is emitted and then shifted to the left at each iteration. We are done when the input stream stops or our stream stops directly.
I leave my original answer here for reference:
Here's a solution that Stream.unfold/2 uses to emit a true stream of values as per your description. This means that you need to add Enum.to_list at the end of your first two examples to get the actual values.
defmodule MyStream do def lookahead(stream, n) do Stream.unfold split(stream, n+1), fn {[], stream} -> nil {[_ | buf] = current, stream} -> {value, stream} = split(stream, 1) {current, {buf ++ value, stream}} end end defp split(stream, n) do {Enum.take(stream, n), Stream.drop(stream, n)} end end
The general idea is that we keep buf of previous iterations. At each iteration, we emit the current buf, take one value from the stream, and add it to the end of buf. This is repeated until buf is empty.
Example:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]] iex> MyStream.lookahead(1..4, 2) |> Enum.to_list [[1, 2, 3], [2, 3, 4], [3, 4], [4]] iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5) [[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]