The implementation of Task.async_stream has changed from 1.4.5 to 1.5.1.
Let's see what happens.
Elixir 1.4.5
In this version, the timeout is part of the receive after block .
receive do {{^monitor_ref, position}, value} ->
This receive block serves to wait for update messages from spawned tasks sent from the monitoring process. For simplicity, I shortened the code.
What does this mean in an application scenario? Task.async_stream will only time out if there is a timeout duration of milliseconds in which it does not receive messages from the generated task.
Example
Let's try your example using [10, 3, 4] :
iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10 (elixir) lib/enum.ex:1776: Enum.reverse/2 (elixir) lib/enum.ex:2528: Enum.to_list/1
As we can see, this leads to a timeout, as expected.
Now, if we try to use [10, 5] , will it work?
iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10 (elixir) lib/enum.ex:1776: Enum.reverse/2 (elixir) lib/enum.ex:2528: Enum.to_list/1
It seems that the initial task takes too much time with a 5 second timeout. But as soon as we add an intermediate step, it works. How about 1 ?
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() [ok: 10, ok: 5, ok: 1]
Elixir 1.5.1
In Elixir 1.5.1, the timeout logic works differently. It uses Process.send_after to send a timeout message for each generated task to the monitoring process.
# Schedule a timeout message to ourselves, unless the timeout was set to :infinity timer_ref = case timeout do :infinity -> nil timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout) end
This message is then processed in the same recipient that generated the Task and sent the message :timeout .
Link to the full function.
Examples
As soon as one process takes longer than the specified timeout, the entire thread falls on its lap, as it should be.
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7 (elixir) lib/enum.ex:1847: Enum.reverse/1 (elixir) lib/enum.ex:2596: Enum.to_list/1
TL DR
Elixir 1.4.5 monitors the timeout again after receiving the result from the spawned process. Elixir 1.5.1 tracks it separately for each spawned process.