Test async combinators without sleep

The great thing about combinators is that you can easily embed business logic asynchronously. However I am yet to come up with a brilliant strategy for testing this cleanly.

  • Sometimes I need to wait for the task to get scheduled so I can check whether the desired side effect happened
  • Sometimes I need to wait to make sure the task ran and it decided not to do something

I want to do this in a way that will run as fast as my CPU is capable, and won’t break under heavy load.

Here’s a naive example (playground) where I want to make sure my filter worked properly:

    let mut rt = Runtime::new().unwrap();
    
    let counter = Arc::new(Mutex::new(0u32));
    let counter_1 = counter.clone();
    let (mut chan_tx, chan_rx) = mpsc::channel(64);
    
    let task = chan_rx
        .filter(|val| val % 2 == 1) // only allow odd numbers
        .map(move |_| *counter_1.lock().unwrap() += 1)
        .map_err(drop)
        .for_each(ok);
    rt.spawn(task);
    
    let _ = chan_tx.try_send(1);
    
    // This sleep is required to make sure the increment happened
    thread::sleep(Duration::from_millis(50));
    assert_eq!(*counter.lock().unwrap(), 1);
    
    let _ = chan_tx.try_send(2);
    
    // This sleep is required to make sure the increment _didn't_ happen
    thread::sleep(Duration::from_millis(50));
    assert_eq!(*counter.lock().unwrap(), 1);
    
    let _ = chan_tx.try_send(3);
    
    // This sleep is required to make sure the increment happened
    thread::sleep(Duration::from_millis(50));
    assert_eq!(*counter.lock().unwrap(), 2);

If I remove any of the three thread::sleeps then the test does not work as intended.

I can think of some ways to work around this but they’re all kind of horrible. Can anyone recommend general strategies for turning the above code into something testable?

The thing is that you need to wait on your stream to produce a value. But since you want to inspect “in between”, you cannot use filter; you must filter within the map itself, so that filtered out elements still yield a () with which you can synchronize:

1 Like

Yeah I see, thanks for the example! That makes sense. I think the pattern I’m coming up against is leaning on for_each() to turn my Streams into Futures so I can spawn them, which prevents this kind of end-of-stream observation. If I take my logic out of the for_each and move it into a prior map, I can break off my Stream at that point. Then I have the choice of driving it synchronously in a test like your example, or futurising it and spawning it. Definitely something to experiment with.