How to "re-run" a Future once it's done?

Hi All,

I have an applications that runs several network services. It roughly goes like this (being a newbie I'm not sure this is the right way to run several futures - please let me know if it doesn't - but it seem to work):

let service1 = Service1Future::new()
let service2 = Service2Future::new()
let service3 = Service3Future::new()

tokio::run(future::lazy(|| {
  tokio::spawn(service1);
  tokio::spawn(service2);
  tokio::spawn(service3);
  Ok(())
}))
...

I want service1 to be able to restart service2. I thought of using mpsc::channel to send restart notifications, and using select on service2 to stop listening upon receive. I'm not sure how to restart service2 once it stopped though.

Any ideas?

Thanks in advance

Haim

1 Like

The Future type, by design, is one-time-use-only. Once you get a value of out it, you're not allowed to touch it any more.

So you have to design your code differently. For example, make a function that returns you a new service each time. Or make service a permanent object that has run_again() that returns a Future for each run.

4 Likes

Also keep in mind that if you're using any streams, shutdown() and possible ones running in each future for a clean sever of the connection

Thanks @kornel, that was partly a poor choice of words on my side. and partly mis-understanding of the model :slightly_smiling_face:. The thing I'm trying to achieve is a loop around service2 (wether inside or outside run) that waits for it to finish (because of a message) and then spawn a new one.

I have no idea where it's valid to put such a loop (and how to wait for a future to finish).

Thanks @nologik,

shutdown() is for stopping the listener, right? This is next on my list (clean exit) :smile:

You would call shutdown() on a stream, such as a TcpStream

What's wrong with calling tokio::spawn(service2) from service1?

Nothing, except I don't know that the previous instance finished. service2 binds to a port and I have to wait until it's done before I can spawn a new instance.

You can make your service2 contain a receiver for a channel, which receives a future to spawn once it finishes. When service2 finishes, you just check if there's anything in the channel, and spawn it if so, otherwise drop the receiver.

When you want to spawn the next service2 you just put it in the channel. If the current service2 already finished, the receiver was dropped and the send will fail, in which case you spawn it from service1 immediately.

Aha :slightly_smiling_face:,

As usual you save my day. Many thanks.

Actually turns out there's a race condition in my suggestion:

service1                 service2
                         calls receive (empty)
calls send (success)
                         drops channel

If things happen in exactly this order, you wont restart service2. Luckily there's another solution, which is arguably simpler.

You use an oneshot channel to send the next task to run. Note that this is a special future-channel, so instead of calling recv, the receiver is a future. This way you can end the service2 task with a receive and spawn, which works regardless of whether service1 is done or not, since if it is not done, the future in service2 will just wait.

1 Like

I'm not sure I completely understand the race condition, and I'll have to be really creative to find a way to use oneshot channel as this restart procedure could be repeated. Here's a more detailed description of the system:

service1 is a management service and it's constantly running and listening to connections. One of its API requests is to reload the web server (in order to refresh self signed certificate - this is why I have to completely shut down the listener on the web server). This is why I liked your first idea. I could clone the consumer channel and give a new one to a function constructing the web server future and then launch it again. Using oneshot means I have to replace the current producer (so I can use it next time). Note that service1 keeps running.

In any case, you gave me some ideas to experiment with. I'll try to play with it over the weekend and see if I can get something to work.

Thanks

This is one of my abstractions for wrapping an adapter around a stream:

/// `stream`: The data source to listen upon
/// `outbound_rx`: The receiver which then sends information across the stream
/// `forward_inbound_tx`: The transmitter for sending information after reception from stream
/// `stop_cxn_tube_rx`: The receiver for kill signals
pub fn wrap_base64<S: 'static + AsyncWrite + AsyncRead + Send>(stream: S, outbound_rx: UnboundedReceiver<OutboundItem>, forward_inbound_tx: UnboundedSender<InboundItem>, stop_cxn_tube_rx: Receiver<()>) -> impl Future<Item=(), Error=HyxeError> {
    let framed = Framed::new(stream, Base64Codec::new());

    let (outbound_sink, inbound_stream) = framed.split();


    let inbound = inbound_stream.map_err(|_| ()).for_each(move |item| {
        forward_inbound_tx.unbounded_send(item).map_err(|_| ())
    }).map_err(|_| ());


    let outbound = outbound_rx.forward(outbound_sink.sink_map_err(|_| ())).map(|_| ()).map_err(|_| ());

    let stopper = stop_cxn_tube_rx.from_err::<HyxeError>().for_each(move |_| {
        println!("[AsyncStreamHandler] Shutting down stream");
        HyxeError::throw("[AsyncStreamHandler] Shutting down stream")
    }).map(|_| ()).map_err(|err| { err });

    inbound.join(outbound).from_err::<HyxeError>().join(stopper).map_err(|mut err| { err.printf() }).and_then(|_| {
        //insert shutdown expression here (one possible location)
        Ok(())
    })
}

You will have to change some things, but essentially, you wrap this around a connection once it's initiated and make sure that you pass the outbound-sender and the inbound-receiver into the appropriate locations to react asynchronously. Additionally, there is a stopper here, so all you have to do is send a () via the stop_connection_tube_tx, and the underlying stream ends.

Also:

/// Useful for toggling between Input types
pub type OutboundItem = BytesMut;

/// Useful for toggling between Output types
pub type InboundItem = BytesMut;

And

use bytes::BytesMut;
use futures::Future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::codec::Framed;
use tokio::prelude::{AsyncRead, AsyncWrite};

You could consider something like this.

1 Like

The basic idea behind the race condition is that when service2 finishes, other threads (e.g. something that calls send) can run in between the call to recv and the receiver being dropped, and since the send was between them, it didn't fail, since it only fails when it is dropped.

Wow, thanks a lot.

This is a little above my head at the moment :slightly_smiling_face:, I'll have to play with it a little to get the idea...

1 Like

I felt the same way at the beginning of this year. Keep immersing yourself, and it'l be a cakewalk for you soon :slight_smile:

1 Like

Nice,

Looking forward for the weekend to play with it. I'll update once done.

Thanks.

Ok, here's the solution i reached (just in case anyone tackles similar problem) - couldn't wait for the weekend :smile: :

I couldn't make the receiver future spawn a new service because at the time of the spawning, the receiver future is still running and so is the web server - so the address binding fails. What I ended up doing is creating a function that constructs the web server future wrapped with the receiver and another function that calls the function above and uses and_then to recursively calls itself. Here's an example (I'm not using two services but a thread::spawn that sends a message after it sleeps for several seconds):

impl SupervisedServer {
    fn new(sleep: u64) -> Self {
        SupervisedServer { sleep }
    }

    fn run() -> Box<Future<Item = (), Error = ()> + Send> {
        Box::new(
            SupervisedServer::new(5)
                .supervised()
                .and_then(|_| SupervisedServer::run()),
        )
    }

    fn supervised(self) -> Box<impl Future<Item = (), Error = ()> + Send> {
        let (tx, rx) = oneshot::channel::<i32>();
        let stop_web_hook = rx.and_then(|_| {
            println!("received a message, exiting...");
            Ok(())
        });
        let server = make_server();
        let supervised = server.select2(stop_web_hook);
        Box::new(future::ok(Ok(())).and_then(move |_: Result<(), ()>| {
            send_stop_signal_after_sleep(tx, self.sleep);
            supervised.then(|_| Ok(()))
        }))
    }
}

and I'm invoking it with:

    tokio::run(SupervisedServer::run().map_err(|e| println!("error: {:?}", e)));

Thanks to all of you for the help.