Simulating time with Futures 0.3

Hi, I am writing asynchronous code with Futures 0.3 that requires to respond to the passage of time. For example, keep-alive messages needs to be sent to a remote host if no messages were sent for a while.
I have a problem simulating the passage of time in some of my tests with my chosen design. This problem is described below.

I chose the following design to handle time: An incoming stream of ticks: Stream<Item=TimerTick>.
A tick happens every constant amount of time (For example, one second). A task that needs to be time sensitive is given an incoming stream of ticks as argument. By responding to incoming TimerTick-s the task can respond to the passage of time.

For most unit tests this approach was very easy to deal with. I could advance time carefully tick by tick and expect certain thing to happen (For example, expect messages to be sent through certain mpsc channels when a timeout occurs). However, when writing large integration tests I have an issue.

Consider two operations that should be separated by a passage of some time. A naive approach would be something like this (Code taken from the offst project).

// Do first operation

// Wait for 0x100 time ticks:
for _ in 0 .. 0x100usize {
    await!(tick_sender.send(())).unwrap();

// Do second operation

In the code above: We created ahead of time some kind of time service which we can control manually by sending ticks through tick_sender (which is of type mpsc::Sender). There are many tasks that were spawned to a ThreadPool. Many of those tasks contain an incoming mpsc::Receiver of timer ticks.

The code written above for sending the ticks does not work very well in most cases. This is because the tasks that received the timer ticks were not given enough poll iterations to respond to the incoming timer ticks. In other words, this code doesn't really simulate the passage of time, because the tasks did not finish what they usually finish in this amount of time.

My next idea to solve this problem was to add some kind of yield mechanism:

// Based on:
// - https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.13/src/futures_test/future/pending_once.rs.html#14-17
// - https://github.com/rust-lang-nursery/futures-rs/issues/869
pub struct Yield(usize);

impl Yield {
    pub fn new(num_yields: usize) -> Self {
        Yield(num_yields)
    }
}

impl Future for Yield {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
        let count = &mut self.as_mut().0;
        *count = count.saturating_sub(1);
        if *count == 0 {
            Poll::Ready(())
        } else {
            waker.wake();
            Poll::Pending
        }
    }
}

This mechanism should allow our main test task to let other tasks run for a while.
We can now change the time passage for loop to something of this form:

for _ in 0 .. 0x100usize {
    await!(tick_sender.send(())).unwrap();
    await!(Yield::new(YIELD_ITERS));
}

Note: In my tests, YIELD_ITERS = 0x1000.

This time most of the tasks will finish handling the passage of time correctly, because 0x1000 is a pretty large amount of yields. However, this solution is pretty hacky. There is no one correct number to put as YIELD_ITERS, because in different occasions we might need to wait a different amount of polls to let all the other tasks complete.

A possible solution would be to be able to let the Executor to run until no more progress can be made. My tests do not involve any external communication, and no more timer ticks are sent, so this state is guaranteed to happen at some point. I am not sure if this is possible to do, and if so, how to do it.

My current workaround is to put a pretty large number in YIELD_ITERS. It makes the tests run very slowly, and sometimes I get a non-deterministic glitch (Maybe sometimes YIELD_ITERS polls is not enough?)

I used to have different solutions to this problem in the past when I was writing python code. In Twisted I had mechanisms to test scheduling. When I used asyncio I wrote a dedicated executor to allow time travel, used mostly for testing.

I would appreciate any ideas on this. Mostly:

  • What would be a good way to design tasks that deal with the passage of time?
  • Is there a way to wait for an executor to run until no more progress can be made?

Thanks!

I don't know if there's some way to get at this executor state; I didn't find anything in a quick look, and I'm sure you've looked harder. It's not the first time I've gone looking for an "idle loop" hook that can run futures only when there's nothing better to do, and I think that's the general-purpose framework element that's needed here.

One thing I think isn't helping is that your yield is basically busy-waiting and probably exacerbating the problem by taking cycles. It might work to have it depend on some actual time passing, basically only doing a partial simulation.

If you're on unix, you could have a future that wraps a mutex, a signal handler that unblocks the mutex, and schedule a SIGALRM every so often. There are other external events that you can wait for from time to time to help, though they're all a bit side-effecty.

Perhaps they should! Absent other solutions, this is an easy thing to wait for and let tests behave (more) like they would in real life. If you had a separate external "ticker" network source that can run at a controllable rate, and at least give you some macro-granularity pacing of internal ticks. Something like every Nth internal tick waits for an external message from the network ticker.

Are you using purely simulated time? This has no connection to real world time (doesn't sound like it.) This case maybe using an atomic usize for the tick. Depending on accuracy some Future is responsible for increasing it and tracking when to notify, how notify is done could be just spawning futures or calling closures.

In case of real world time;

You don't have such luxury outside of RTOS. You usually only have after a time has past. With this case you often keep hold of the time of last occurrence. Linked with ticks you can advance the specific amount, which avoids any long term lag in simulation.

It gets worse too, see;

Did you consider using a framework for example the run_later() method of actix' actors sounds like a good choise for your use-case.

Hi, thanks for all the replies! I got some interesting ideas from reading this thread.

This could have been a solution. For example, I could have started a real timer (Like I do in the final application binary). However, some of the tests simulate very long waits, for example, 100+ seconds. The integration tests do pass when I use this method, however, the integration tests run very slowly.
Another issue I have with using real timers (Or any real external events) is that it might cause the tests to be non deterministic.

I agree. Currently I have one timer task that sends timer ticks through channels to all other futures. The ideal is that the time tick will happen every constant amount of time (About 1 second), but I realize that there could be some deviation from this amount.

As I am using things like keepalives for the TCP connections I am maintaining, I have to assume some ability of the device running my code to track time. My general assumption is that one tick will be more or less one second, even 40% deviation from one second should still allow the communication to function properly. I am somewhat new to this. Do you know a better way?

Interesting! Given that I use run_later(), is there a way for me to simulate the passage of time without actually waiting this amount of time?
For example, if my code contains a wait of 100 seconds, is it possible for me to somehow skip this wait period in my tests, while still have monotonicity guarantee for all other waits?

Solution proposal

I had this idea: I create a struct SpawnerWait which wraps something that implements the Spawn trait.
Then I use SpawnerWait as my spawner for the rest of the test code.

The wrapper struct: SpawnerWait, has a system to track all spawned futures. It tracks beginning and ending of poll() calls, and any calls to wake(). Whenever a poll() calls ends, SpawnerWait will check if any more progress is possible inside the executor. If not, it notifies anyone that was waiting that no more progress is possible.

The full code is here.
Beware, it could be incorrect. I haven't yet ran it for my large integration test cases.

An example for using SpawnerWait (From the test suite):

#[test]
fn test_channel_full() {
    let mut thread_pool = ThreadPool::new().unwrap();

    let mut wspawner = SpawnerWait::new(thread_pool.clone());
    let waiter = wspawner.wait();

    let arc_mutex_res = Arc::new(Mutex::new(0usize));

    let c_arc_mutex_res = Arc::clone(&arc_mutex_res);
    wspawner.spawn(async move {
        // Channel has limited capacity:
        let (mut sender, _receiver) = mpsc::channel::<u32>(8);

        // We keep sending into the channel.
        // At some point this loop should be stuck, because the channel is full.
        loop {
            await!(sender.send(0)).unwrap();
            let mut res_guard = c_arc_mutex_res.lock().unwrap();
            *res_guard = res_guard.checked_add(1).unwrap();
        }

    }).unwrap();

    thread_pool.run(waiter).unwrap();
    let res_guard = arc_mutex_res.lock().unwrap();
    assert_eq!(*res_guard, 9);
}

In the code above we have a future that does the following:

  • Creates a channel of capacity 8.
  • Starts an infinite loop of sending through the sender but never reading the receiver. This should cause the future to be stuck after 9 writes into the channel.

If we run thread_pool.run() over our future our program will be stuck. However, if we instead use SpawnerWait we can stop waiting when we detect no more progress can happen in the thread pool.

1 Like

The trio asynchronous framework for Python has a testing facility for simulating time. You might find some kind of inspiration there. The doc page about testing is here: time and timeouts.

1 Like

First of all I see you are using a lot of hand-made solutions. Before diving deeper into actix I would recommend thinking about if this is realy an option for you. As far as I see switching to actix would mean a complete rewrite of your project. (Don't forgett to take a look at actix-web).
In my first Rust project I did something pretty similar. Building up asynconous message passing system my self. It was pretty late in the project I learned that I don't have to fear dependencies in Rust like I had when programming C. Switching from my hand made solution to actix costed me about 3 days but resulted in an instant speedup of 3.

About your request about aborting the wait:
The run_later() method does not prevent an actor from processing other messages during the wait. So your actor could hold some flag to abort the wait. I still don't understand what you exactly want to achieve but here is some code how I would do it:

#[derive(Message)]
struct Wait {
  time: Duration,
  factor: u32,
}

#[derive(Message)]
struct AbortWait;

struct MyActor(bool);

impl Actor for MyActor {
  type Context = Context<Self>;
}

impl Handler<Wait> for MyActor {
  type Result = ();

  fn handle(&mut self, msg: Wait, ctx: &mut Self::Context) -> Self::Result {
    let dur = msg.time * msg.factor; // adjust Duration as needed. Maybe a better scaling is required.
    ctx.run_later(dur, move |my_actor, _| {
      if my_actor.0 { println!("Waited"); }
      else { println!("Wait aborted"); }
    })
  }
}

impl Handler<AbortWait> for MyActor {
  type Result = ();

  fn handle(&mut self, msg: AbortWait, ctx: &mut Self::Context) -> Self::Result {
    self.0 = false;
  }
}

Hey, thanks for the very detailed reply. I think that I did not explain what I'm trying to do well enough.
I just finished writing a basic util to solve my problem, you can check it out here:

The relevant code could be seen here:

fn main() {
    let mut thread_pool = ThreadPool::new().unwrap();

    let mut wspawner = WaitSpawner::new(thread_pool.clone());
    let waiter = wspawner.wait();

    let (mut a_sender, mut b_receiver) = mpsc::channel::<u32>(0);
    let (mut b_sender, mut a_receiver) = mpsc::channel::<u32>(0);

    // We spawn two futures that have a conversation.
    // Spawn first future:
    wspawner.spawn(async move {
        await!(a_sender.send(0)).unwrap();
        assert_eq!(await!(a_receiver.next()).unwrap(), 1);
        await!(a_sender.send(2)).unwrap();
    }).unwrap();

    // A shared result value, used to make sure that the second future
    // has finished
    let arc_mutex_res = Arc::new(Mutex::new(false));
    let c_arc_mutex_res = Arc::clone(&arc_mutex_res);

    // Spawn second future:
    wspawner.spawn(async move {
        assert_eq!(await!(b_receiver.next()).unwrap(), 0);
        await!(b_sender.send(1)).unwrap();
        assert_eq!(await!(b_receiver.next()).unwrap(), 2);
        let mut res_guard = c_arc_mutex_res.lock().unwrap();
        *res_guard = true;
    }).unwrap();

    // >>>>>>>> This is what I was trying to achieve <<<<<<<
    // Keep running until no more progress is possible
    thread_pool.run(waiter);

    // Make sure that the second future has finished:
    let res_guard = arc_mutex_res.lock().unwrap();
    assert!(*res_guard);
}

In the very simplified example above, there are two futures that have a conversation.
The problem is that if I spawn both of them, I have no means of waiting until the conversation is over, because I do not have a way to get a signal from them when the conversation is closed.

In most of my unit tests in my code I don't have this problem, because I construct my code so that I will have some kind of signal that the asynchronous execution has ended. However, when I write integration tests some components (like servers) are self contained.

One of my unit tests for example spawns a few relay servers and let them talk to each other. How can I know that the servers have finished talking?

The code above shows a possible solution. I just got it working today, and I am not yet sure how correct it is, but it might be a start.

EDIT: Interestingly, when I use WaitSpawner with some of my larger tests some of them crashed (with a segmentation fault). Since I have no unsafe code I think that might be some problem with futures-rs. I filed an issue here: