Task.async_stream timeout

The following parameters are described in the Task.async_stream parameters :timeout :

The maximum amount of time (in milliseconds) for each task is allowed to complete. Default 5000

In my testing, I did the following:

 iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list [ok: 10, ok: 4, ok: 5] iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10 (elixir) lib/enum.ex:1776: Enum.reverse/2 (elixir) lib/enum.ex:2528: Enum.to_list/1 

Why does the first example not turn off (but takes ~ 10 seconds to execute), and the second example shows the expected timeout behavior?

+5
source share
1 answer

The implementation of Task.async_stream has changed from 1.4.5 to 1.5.1.

Let's see what happens.

Elixir 1.4.5

In this version, the timeout is part of the receive after block .

 receive do {{^monitor_ref, position}, value} -> # ... {:down, {^monitor_ref, position}, reason} -> # ... {:DOWN, ^monitor_ref, _, ^monitor_pid, reason} -> # ... after timeout -> # ... end 

This receive block serves to wait for update messages from spawned tasks sent from the monitoring process. For simplicity, I shortened the code.

What does this mean in an application scenario? Task.async_stream will only time out if there is a timeout duration of milliseconds in which it does not receive messages from the generated task.

Example

Let's try your example using [10, 3, 4] :

 iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10 (elixir) lib/enum.ex:1776: Enum.reverse/2 (elixir) lib/enum.ex:2528: Enum.to_list/1 

As we can see, this leads to a timeout, as expected.

Now, if we try to use [10, 5] , will it work?

 iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10 (elixir) lib/enum.ex:1776: Enum.reverse/2 (elixir) lib/enum.ex:2528: Enum.to_list/1 

It seems that the initial task takes too much time with a 5 second timeout. But as soon as we add an intermediate step, it works. How about 1 ?

 iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() [ok: 10, ok: 5, ok: 1] 

Elixir 1.5.1

In Elixir 1.5.1, the timeout logic works differently. It uses Process.send_after to send a timeout message for each generated task to the monitoring process.

 # Schedule a timeout message to ourselves, unless the timeout was set to :infinity timer_ref = case timeout do :infinity -> nil timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout) end 

This message is then processed in the same recipient that generated the Task and sent the message :timeout .

Link to the full function.

Examples

As soon as one process takes longer than the specified timeout, the entire thread falls on its lap, as it should be.

 iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list() ** (exit) exited in: Task.Supervised.stream(5000) ** (EXIT) time out (elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7 (elixir) lib/enum.ex:1847: Enum.reverse/1 (elixir) lib/enum.ex:2596: Enum.to_list/1 

TL DR

Elixir 1.4.5 monitors the timeout again after receiving the result from the spawned process. Elixir 1.5.1 tracks it separately for each spawned process.

+1
source

Source: https://habr.com/ru/post/1271203/


All Articles