Is Stream.resource only from, not to?

Reading the documentation, Stream.resource only seems to be designed to create a resource that can read / receive values, rather than write / put in. Did I understand correctly or am I reading this incorrectly? And if I understand correctly, what type of resource should I create for recording / transferring from a stream, Collectable ?

+5
source share
2 answers

You read the documentation correctly, Stream.resource is just a convenient method for calculating values. It is also true that if you want to use values, you need to implement the Collectable protocol. You can study the source code of File.Stream , which implements both Enumerable and Collectable .

For demonstration purposes, the ChunkedWriter module is ChunkedWriter , which stores the values ​​until the buffer is full, and then discards it when the limit is reached:

 defmodule ChunkedWriter do def open(chunk_size) do Agent.start_link fn -> {[], chunk_size} end end def write(agent, value) do Agent.update agent, fn {old_buffer, chunk_size} -> buffer = [value | old_buffer] new_buffer = cond do length(buffer) < chunk_size -> buffer true -> do_flush(buffer) end {new_buffer, chunk_size} end end def flush(agent) do Agent.update agent, fn {buffer, chunk_size} -> {do_flush(buffer), chunk_size} end end defp do_flush(buffer) do buffer |> Enum.reverse |> Enum.each(&IO.puts/1) IO.puts "---" [] end def close(agent) do flush(agent) Agent.stop(agent) end def stream(chunk_size) do %ChunkedWriter.Stream{chunk_size: chunk_size} end end 

This module will be used as follows:

 writer = ChunkedWriter.open(3) ChunkedWriter.write(writer, 1) ChunkedWriter.write(writer, 2) ChunkedWriter.write(writer, 3) ChunkedWriter.write(writer, 4) ChunkedWriter.write(writer, 5) ChunkedWriter.close(writer) 

Displays

 1 2 3 --- 4 5 --- 

Now the ChunkedWriter.stream/1 method simply sets the structure, which will then be sent to ChunkedWriter.Stream . Here is the ChunkedWriter.Stream module with its Collectable implementation, so we can connect Enumerable to it.

 defmodule ChunkedWriter.Stream do defstruct chunk_size: 1 defimpl Collectable do def into(stream = %ChunkedWriter.Stream{chunk_size: chunk_size}) do {:ok, writer} = ChunkedWriter.open(chunk_size) {stream, fn _acc, {:cont, value} -> ChunkedWriter.write(writer, value) _acc, :done -> :ok = ChunkedWriter.close(writer) stream _, :halt -> :ok = ChunkedWriter.close(writer) end} end end end 

In action:

 Stream.cycle([1,2,3]) |> Stream.take(10) |> Stream.into(ChunkedWriter.stream(4)) |> Stream.run 

Fingerprints:

 1 2 3 1 --- 2 3 1 2 --- 3 1 --- 
+7
source

You're right. Stream.resource / 3 - convenience for determining sources. At the bottom, Stream.resource / 3 implements Enumerable, which is the protocol of all sources (and transformers, such as a card).

The Enumerable protocol is really good at taking values ​​from things and passing them forward, however, it does not work to collect these values ​​into something reasonable. To do this, you must implement the Collectable protocol.

Well, you may wonder what if you want both?

For an example, you can look at the File.Stream or GenEvent.Stream in Elixir code. In short, if you have a MyData module, you can define a function MyData.stream/0 that will return a %MyData.Stream{} structure that implements both the Enumerable and Collectable protocols, allowing you to provide both mechanisms.

+4
source

Source: https://habr.com/ru/post/1208855/


All Articles