Tokio - How to stop the event loop


#1

I’m trying to understand Tokio but not as easy as expected. Does anyone have a way to stop the event loop in Tokio? I launched with core.run(…)… and after, my code is :

#[async]
fn client(stream: TcpStream) -> Result<()> {
let (writer, reader) = stream.framed(MyCodec).split();

let mut client = Client::new(writer);

client = await!(client.send_handshake_req())?;

#[async]
for msg in reader {
    client = await!(client.handle_msg(msg))?;
}
println!("Client closed");
Ok(())

}

I will process all msg received, but at some point, my client want to close, he wants to close the connection. How is it possible to stop that ??


#2

Core::run returns when the future you give it resolves (either with success or an error). If you have an “infinite” future (e.g. a TcpListener’s Incoming stream that you for_each over), then a common technique is to take that future and select() it with another; this other one can be a timeout or a “shutdown” signal (using something like the oneshot::Receiver: https://docs.rs/futures/0.1.18/futures/unsync/oneshot/struct.Receiver.html). In this latter case, you pass the oneshot::Sender to a place that will want to indicate shutdown, and that place will send a message (typically just the unit: ()) when it’s time to go. That will make select return, and the Core::run method will exit.

Alternatively, if you’re for_each'ing over a finite Stream whose end-of-stream is the signal to exit, then you don’t need to do anything special.

Let me know if this doesn’t make sense and I’ll try to expand some more.


#3

I understand what you suggested. What I’m not sure to know is how to write that. I saw the function Future::select, but since I’m using the futures-await crate, I’m not sure how to apply that. I tried this :

if let Err(e) = core.run(connect_client(handle).select(wait_cancel_event())) {
    println!("Error running the client: {}", e);
}

where connect_client and wait_cancel_event function have #[async] before.

#[async]
fn connect_client(handle: Handle) -> io::Result<()> {

}

#[async]
fn wait_cancel_event(rx: Receiver<()>) -> io::Result<()> {
…}

But I don’t understand the type of the error returned by Core.run() because I got an error on the line
println!(“Error running the client: {}”, e);

error[E0277]: the trait bound (std::io::Error, futures::SelectNext<impl futures::__rt::MyFuture<std::result::Result<(), std::io::Error>>, impl futures::__rt::MyFuture<std::result::Result<(), std::io::Error>>>): std::fmt::Display is not satisfied

In conclusion : I’m not sure if it will work that way. I didn’t find a way to write the line to wait on a receiver. Still working on that, but if you have good hint, let me know :slight_smile: Thanks


#4

In

println!(“Error running the client: {}”, e);

Try replacing {} by {:?}. Looks like the Error you get is not printable as is.

As for the select part I have no idea.


#5

The core.run(...) part looks fine to me. I should also mention there’s a select2() combinator that allows the futures to resolve to different types; that’s a minor point though.

The error is because that type the compiler spit out does not implement the std::fmt::Display trait, and so you cannot format (or print in your case) it with “{}”. You can try asking for Debug output via “{:?}” but I’m not sure if those synthesized structs that futures-await generates implement it. If they don’t, you’ll get a similar error except it’ll refer to std::fmt::Debug. Try it though and if it doesn’t work, we can go from there.


#6

I had already tried with :? and it didn’t work :

error[E0277]: the trait bound impl futures::__rt::MyFuture<std::result::Result<(), std::io::Error>>: std::fmt::Debug is not satisfied

Thanks


#7

Ok, so the synthetic type the compiler generated that implements futures::__rt::MyFuture does not derive/impl Debug either - that’s pretty annoying.

If you want to stick with using futures-await, then the only choice I see is to not attempt to print the error.

@alexcrichton, do you by chance know what the plan is (if there is one) to allow at least debug printing impl trait synthetic structs, particularly as it relates to the futures-await use case?


#8

I have difficulty to understand all the type passed and returned to function. I looked the doc of the function select and here is what we can read :

“This function will return a new future which awaits for either this or the other future to complete. The returned future will finish with both the value resolved and a future representing the completion of the other work.”

If one of them finish with and error, what will be returned ? An error + a future for the other work ? But how to extract the error from there ?


#9

That’s right - it resolves to the error + future for the other work in this case. Here’re the Item and Error types for Select's implementation of Future:

type Item = (A::Item, SelectNext<A, B>)
type Error = (A::Error, SelectNext<A, B>)

SelectNext is basically the “future for the other work”. So if you were (for example) pattern matching, you’d have code like this to extract the error:

match core.run(first.select(second)) {
   Err((e, _other)) => // e is the error,
   Ok((item, _other)) => // item is the yield item
}

I whipped up a quick playground for you to play around with this. It uses immediately resolved futures so you don’t need a reactor to drive them.


#10

Awesome, thank you very much, I understand. And since I used select2, I found that it is

                          match x {
                              Either::A((e, _)) => {
                              },
                              Either::B((e, _)) => {
                              }

One more question. Is there any reason to use or not futures-await ? I have difficulty to understand the equivalent. I started from an example using futures-await so I’m working with that, but what could be the equivalent of this code if you don’t use it (no need to translate it, but maybe, could you explain what it would look like ?) Thanks

fn main() {

core.run(connect_client(handle));

}

#[async]
fn connect_client(handle: Handle) -> io::Result<()> {
let port = 12345;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);

let stream = await!(TcpStream::connect(&addr, &handle))?;

await!(client(stream))
    .map_err(|error| println!("Error handling client: {}", error));
Ok(())

}

#[async]
fn client(stream: TcpStream) -> Result<()> {
let (writer, reader) = stream.framed(MyCodec).split();

let mut client = Client::new(writer);

client = await!(client.send_handshake_req())?;

#[async]
for msg in reader {
    client = await!(client.handle_msg(msg))?;
}

#11

A sketch would be something like:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    core.run(connect_client(&handle)).unwrap();
}

fn connect_client(handle: &Handle) -> Box<Future<Item = (), Error = std::io::Error>> {
    let port = 12345;
    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
    let fut = TcpStream::connect(&addr, &handle);
    let fut = fut.and_then(|sock| {
       let (writer, reader) = sock.framed(MyCodec).split();
       let c = Client::new(writer);
       let handshake = c.send_handshake_req();
       let server = handshake.and_then(move |client| {
           reader.for_each(|msg| client.handle_msg(msg))
                 .map_err(|error| println!("Error handling client: {}", error)))
       });
       server
    });
    Box::new(fut)
}