You read the documentation correctly, Stream.resource is just a convenient method for calculating values. It is also true that if you want to use values, you need to implement the Collectable protocol. You can study the source code of File.Stream , which implements both Enumerable and Collectable .
For demonstration purposes, the ChunkedWriter module is ChunkedWriter , which stores the values ββuntil the buffer is full, and then discards it when the limit is reached:
defmodule ChunkedWriter do def open(chunk_size) do Agent.start_link fn -> {[], chunk_size} end end def write(agent, value) do Agent.update agent, fn {old_buffer, chunk_size} -> buffer = [value | old_buffer] new_buffer = cond do length(buffer) < chunk_size -> buffer true -> do_flush(buffer) end {new_buffer, chunk_size} end end def flush(agent) do Agent.update agent, fn {buffer, chunk_size} -> {do_flush(buffer), chunk_size} end end defp do_flush(buffer) do buffer |> Enum.reverse |> Enum.each(&IO.puts/1) IO.puts "---" [] end def close(agent) do flush(agent) Agent.stop(agent) end def stream(chunk_size) do %ChunkedWriter.Stream{chunk_size: chunk_size} end end
This module will be used as follows:
writer = ChunkedWriter.open(3) ChunkedWriter.write(writer, 1) ChunkedWriter.write(writer, 2) ChunkedWriter.write(writer, 3) ChunkedWriter.write(writer, 4) ChunkedWriter.write(writer, 5) ChunkedWriter.close(writer)
Displays
1 2 3 --- 4 5 ---
Now the ChunkedWriter.stream/1 method simply sets the structure, which will then be sent to ChunkedWriter.Stream . Here is the ChunkedWriter.Stream module with its Collectable implementation, so we can connect Enumerable to it.
defmodule ChunkedWriter.Stream do defstruct chunk_size: 1 defimpl Collectable do def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do {:ok, writer} = ChunkedWriter.open(chunk_size) {stream, fn _acc, {:cont, value} -> ChunkedWriter.write(writer, value) _acc, :done -> :ok = ChunkedWriter.close(writer) stream _, :halt -> :ok = ChunkedWriter.close(writer) end} end end end
In action:
Stream.cycle([1,2,3]) |> Stream.take(10) |> Stream.into(ChunkedWriter.stream(4)) |> Stream.run
Fingerprints:
1 2 3 1 --- 2 3 1 2 --- 3 1 ---