How to clear interruption of tokio-core events and futures cycle :: Stream in Rust

I do the Tokyo core and can figure out how to create an event loop. However, there are two things that I'm not sure about - how to gracefully exit the event loop and how to exit the thread running inside the event loop. For example, consider this simple piece of code that spawns two listeners in an event loop and waits for another thread to indicate an exit condition:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Stream, Future};
use std::thread;
use std::time::Duration;
use std::sync::mpsc::channel;

fn main() {
    let (get_tx, get_rx) = channel();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();
        let (tx, rx) = unbounded();
        get_tx.send(tx).unwrap(); // <<<<<<<<<<<<<<< (1)

        // Listener-0
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let work = rx.for_each(|v| {
            if v {
                // (3) I want to shut down listener-0 above the release the resources
                Ok(())
            } else {
                Err(()) // <<<<<<<<<<<<<<< (2)

            }
        });

        let _ = core.run(work);
        println!("Exiting event loop thread");
    });

    let tx = get_rx.recv().unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0"); // <<<<<< (3)
    tx.send(true).unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    tx.send(false).unwrap();

    j.join().unwrap();
}

So to speak, after sleeping in the main thread, I want a clean output from the event stream. I am currently posting something to the event loop to complete it and thereby free the thread.

However, both, (1)and (2)feel compromised - I get an error as the output condition. My questions:

1) ? , .

2) , (3) - -0 . ?

+4
1

(core) (, run()) (drop() ed). . core.run() Future.

A Stream None ( (3) ). , , TCP- Stream, , .

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Async, Stream, Future, Poll};
use std::thread;
use std::time::Duration;

struct CompletionPact<S, C>
    where S: Stream,
          C: Stream, 
{
    stream: S,
    completer: C,
}

fn stream_completion_pact<S, C>(s: S, c: C) -> CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    CompletionPact {
        stream: s,
        completer: c,
    }
}

impl<S, C> Stream for CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        match self.completer.poll() {
            Ok(Async::Ready(None)) |
            Err(_) |
            Ok(Async::Ready(Some(_))) => {
                // We are done, forget us
                Ok(Async::Ready(None)) // <<<<<< (3)
            },
            Ok(Async::NotReady) => {
                self.stream.poll()
            },
        }
    }
}

fn main() {
    // unbounded() is the equivalent of a Stream made from a channel()
    // directly create it in this thread instead of receiving a Sender
    let (tx, rx) = unbounded::<()>();
    // A second one to cause forgetting the listener
    let (l0tx, l0rx) = unbounded::<()>();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();

        // Listener-0
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                    &core.handle())
                .unwrap();

            // wrap the Stream of incoming connections (which usually doesn't
            // complete) into a Stream that completes when the
            // other side is drop()ed or sent on
            let fe = stream_completion_pact(l.incoming(), l0rx)
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                    &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let _ = core.run(rx.into_future());
        println!("Exiting event loop thread");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0");
    // A drop() will result in the rx side Stream being completed,
    // which is indicated by Ok(Async::Ready(None)).
    // Our wrapper behaves the same when something is received.
    // When the event loop encounters a
    // Stream that is complete it forgets about it. Which propagates to a
    // drop() that close()es the file descriptor, which closes the port if
    // nothing else uses it.
    l0tx.send(()).unwrap(); // alternatively: drop(l0tx);
    // Note that this is async and is only the signal
    // that starts the forgetting.

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    // Same concept. The reception or drop() will cause Stream completion.
    // A completed Future will cause run() to return.
    tx.send(()).unwrap();

    j.join().unwrap();
}
+3

Source: https://habr.com/ru/post/1670844/


All Articles