Composing streams and futures for tokio runner


#1

Hi all,
I am trying to extract messages(they are futures themselves) from an unbounded queue every N seconds and spawning them into the tokio handler.
I’ve tried dozens of iterations but I cannot seem to find the right approach, it looks like it should be possible, but I always hit a future type mismatch or end up with borrow issues.

This is the code that shows more or less what I want:

let fut = Interval::new_interval(Duration::from_secs(1))
            .for_each(|num| vantage_dequeuer.into_future() )
            .for_each(|message:VantageMessage |{
                handle.spawn(message);
                return Ok(());
            })
            .map_err(|e| panic!("delay errored; err={:?}", e));

core.run(fut);

Playground link: here


#2

Look if this is helpful:

let join_handle = thread::spawn(move || {
    let vantage_dequeuer = Cell::new(vantage_dequeuer);
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let fut = Interval::new_interval(Duration::from_secs(1))
        .then(|_| vantage_dequeuer.take().unwrap().into_future())
        .map_err(|(e, _)| panic!("delay errored; err={:?}", e) )
        .for_each(|(message, queue)| {
            vantage_dequeuer.set(Some(queue));
            message.map(|message| handle.spawn(message)).ok_or(())
        })
        .map_err(|e| panic!("delay errored; err={:?}", e) );

    core.run(fut).unwrap();
    println!("Returned!");
});

I think it is a bit suboptimal, because (as you can see) I used a Cell for the multiple mutable reference problem (try to use it without Cell, maybe you are able to write a better solution).

Nevertheless, what I am doing is the following:

  1. The Interval future is mapped to the future obtained from the UnboundedReceiver::into_future()
  2. A StreamFuture has an Error type equal to a tuple of the actual error and the stream object (in this case the UnboundedReceiver. I map_err to avoid having a tuple as Error in the following steps.
  3. The Item is now a tuple of the Option<VantageMessage> and the UnboundedReceiver. The latter is stored back in the Cell, and the message is mapped, passed to the spawn and transformed into a Result<(), ()> (the error is () in case message was None)
  4. The final map_err is mostly useless, because the error can only be an empty tuple. I left it there because I think you could get a proper error from spawn and in case the message is empty.

Tell me what you think, my code can be surely improved quite a bit.


#3

Thanks a lot Edoardo.
Looking at it now it doesn’t look like I can escape that level of indirection (Cell) since the stream.into_future() sends itself(stream) down the line, it exists both in the next chain link and in its own one so I need to take it out and then put it back, the way you did.


#4

Have you looked into zip()'ing the two streams?


#5

Yeah vitalyd, I actually ended up implementing it with zip, it only has one minor issue when used with timers.
Since it takes the timer event and then waits for the second stream, if there is a pause bigger than the timer and then 2 events are emitted in a row from the secondary stream the first one will use the cached event from the timer/primary stream before, then immediately poll it again and obtain an event because the time had passed so I end up with 2 events in a row and no wait time, but its ok since its not very often I can just work with that.


#6

Yeah, that’s true. If you want, you can likely keep track of last time a message was sent and if it’s been less than your period, add a Delay future into the mix; this Delay would wait for the balance of the period, and then spawn the message. It’s a bit more work, but if you end up thinking it’s needed, it’s doable.

Alternatively, skip the Interval entirely and just do the above directly off items coming from the channel - track last sent time, and every time a new item is picked up off the channel, add the corresponding Delay, if any.