Fulfillment of futures collection sequentially

I have a set of futures that I want to combine into one future that forces them to be executed sequentially.

I looked at the function futures_ordered. It seems that the results are being returned sequentially, but futures are executed simultaneously.

I tried foldfutures by combining them with and_then. However, this is difficult with the type system.

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

This results in the following error:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

I probably approached this wrong, but I ran out of ideas.

+4
source share
3 answers

Combine iter_okand Stream::for_each:

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok ( ). , for_each, Future - , .

for_each , , . , , , () .

for_each Future, ( ), () .

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)
+3

Stream buffered, .

, buffered :

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);

when_result_ready Stream, , .

buffered , Future Stream flatten:

iter_ok(tasks).map(|f|f.into_stream()).flatten()

flatten , " , ". Future , . , , ~ 80% , buffered.


Stream, Future . , , Future , Future, , Stefan .

+4

As mentioned in the comments , your types are too specific. You can create an attribute object by placing each step:

let combined_task: Box<Future<Item = (), Error = ()>> =
    tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
        Box::new(acc.and_then(|_| task))
    });
+2
source

Source: https://habr.com/ru/post/1691696/


All Articles