RxCpp: observer lifetime if observer_n is used (rxcpp :: observ_on_new_thread ())

What is the correct way to wait for all on_completed observers to be called if observers use the observer_ function (rxcpp :: observ_on_new_thread ()):

For example:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1 on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1 on_completed never got called.

My question is how to wait for s1 on_completed to complete without having to set and test some variables.

The motivation for using the observ_on () function is that there are usually several observers on the values, and I would like each observer to work simultaneously. Perhaps there are different ways to achieve the same goal, I am open to all your suggestions.

+4
1

.

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        s.on_next(2);
        // ...
        s.on_completed();
    };

    auto values = rxcpp::observable<>::create<int>(generator).publish();

    auto work = values.
        observe_on(rxcpp::observe_on_new_thread()).
        tap([&](int c) {
            slow_function(foo);
        }).
        finally([](){printf("s1 completed\n");}).
        as_dynamic();

    auto start = values.
        ref_count().
        finally([](){printf("s2 completed\n");}).
        as_dynamic();

    // wait for all to finish
    rxcpp::observable<>::from(work, start).
        merge(rxcpp::observe_on_new_thread()).
        as_blocking().subscribe();
}

.

. , comb_latest.

< > :: from() , ref_count, , .

, . - . rxcpp . , . , , ​​ , .

.

+3

Source: https://habr.com/ru/post/1605658/


All Articles