Understanding Tokio Blocking Behavior

Hi,

I have an async processing pipeline for a stream of items. One processing step takes a lot longer that previous ones and I want to keep the latency from input to output low. I can afford to drop input values, but want to be notified if that happens.
I implemented a LossyStream that continuously fetches values from the input stream, caches the latest value and emits it, upon request. When values are dropped, a closure is called.

LossyStream Implementation
use futures::stream;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::{error::Error, iter, thread, time::Duration};

/// A lossy stream which drops values, if they are not fetched in time.
pub struct LossyStream<T> {
    cache: Arc<Mutex<Option<T>>>,
    waker: Arc<Mutex<Option<Waker>>>,
    done: Arc<AtomicBool>,
}

impl<T> Stream for LossyStream<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut cache = self.cache.lock().unwrap();
        if cache.is_some() {
            // there is a new value available

            // remove the value from the cache and drop the MutexGuard immediately
            let v = cache.take();
            drop(cache);

            Poll::Ready(v)
        } else if self.done.load(std::sync::atomic::Ordering::Relaxed) {
            // the stream is completed
            Poll::Ready(None)
        } else {
            // We are not ready to emit another value yet.
            //   We need to store the current waker, so that we can wake the task,
            //   once another value is available.
            *self.waker.lock().unwrap() = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

/// Transform a stream into a lossy stream.
/// The stream will be moved to another tokio task, where it is continuously polled for new values.
/// The latest value is cached and returned once. If no value is present, the stream waits.
fn into_lossy<T>(
    inner: impl Stream<Item = T> + Send + 'static,
    on_drop: impl Fn(T) + Send + 'static,
) -> LossyStream<T>
where
    T: Send + 'static,
{
    // the currently available value in the output stream
    let cache: Arc<Mutex<Option<T>>> = Arc::new(Mutex::new(None));
    let cache_clone = Arc::clone(&cache);

    // notifier to signal that a new value can be fetched
    let waker = Arc::new(Mutex::new(Option::<Waker>::None));
    let waker_clone = Arc::clone(&waker);

    // indicator that the source stream is completed
    let done = Arc::new(AtomicBool::new(false));
    let done_clone = Arc::clone(&done);

    tokio::spawn(async move {
        let mut inner = Box::pin(inner);
        while let Some(v) = inner.next().await {
            {
                let mut cache = cache_clone.lock().unwrap();

                // The old value is about to be replaced, invoke the drop fn
                if let Some(v) = cache.take() {
                    on_drop(v);
                }

                *cache = Some(v);

                // notify the output stream, that a new value is present
                let mut waker = waker_clone.lock().unwrap();
                if let Some(waker) = waker.take() {
                    waker.wake();
                }
            }
        }

        // The stream is done, if the last value has not been picked up,
        //   it is considered dropped.
        let mut cache = cache_clone.lock().unwrap();
        if let Some(v) = cache.take() {
            on_drop(v);
        }

        // send signal that no more values will be available
        done_clone.store(true, std::sync::atomic::Ordering::Relaxed);
    });

    LossyStream { waker, cache, done }
}

// trait for easy integration
pub trait IntoLossyStream<T>
where
    T: Send + 'static,
{
    fn into_lossy(self, on_drop: impl Fn(T) + Send + 'static) -> LossyStream<T>;
}

// blanket implementation for all streams
impl<T, AnyStream: Stream<Item = T> + Send + 'static> IntoLossyStream<T> for AnyStream
where
    T: Send + 'static,
{
    fn into_lossy(self, on_drop: impl Fn(T) + Send + 'static) -> LossyStream<T>
    where
        T: Send + 'static,
    {
        into_lossy(self, on_drop)
    }
}

This implementation works but only in tokio::main and I do not understand why.

Directly in tokio::main

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

    run().await.unwrap();

    Ok(())
}

async fn run() -> Result<(), Box<dyn Error>> {
    let i = iter::repeat_with(|| {
        thread::sleep(Duration::from_millis(100));
        1
    });

    let mut s = stream::iter(i)
        // drop
        .into_lossy(|_| println!("  drop"))
        .map(|x| {
            thread::sleep(Duration::from_millis(320));
            x
        });

    while let Some(_) = s.next().await {
        println!("next");
    }

    Ok(())
}
Wrapped in tokio::spawn

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    tokio::spawn(async move {
        run().await.unwrap();
    });
    loop {
        tokio::task::yield_now().await;
    }

    Ok(())
}

async fn run() -> Result<(), Box<dyn Error>> {
    let i = iter::repeat_with(|| {
        thread::sleep(Duration::from_millis(100));
        1
    });

    let mut s = stream::iter(i)
        // drop
        .into_lossy(|_| println!("  drop"))
        .map(|x| {
            thread::sleep(Duration::from_millis(320));
            x
        });

    while let Some(_) = s.next().await {
        println!("next");
    }

    Ok(())
}

Both examples emit "drop" messages, but only the example "Directly in tokio::main" prints messages in the main loop (while let Some(_) = s.next().await {...}).

Even when using a multi threaded scheduler, there still seems to be blocking behaviour inside the LossyStream, but I do not understand why.
My expectation is that the tokio::main fn is the same as a tokio::spawn task. Is that not the case?

I haven't checked your LossyStream implementation, but

thread::sleep(Duration::from_millis(320));

will block the runtime and isn't a feasible way to simulate latency for your lossy stream implementation. You can read more about why blocking the runtime is bad in Async: What is blocking? – Alice Ryhl. You could use tokio::time::sleep instead.

No, the future executed in Runtime::block_on (which the main! macro creates for you) is executed on the current thread. You need to explicitly spawn tasks onto the thread-pool of the multi-threaded runtime from said future.

1 Like

In this example I cannot, because the closure for iter::repeat_with and map are not async. I am also basically doing the same thing when processing the data. there is a larger block of time where the current thread is busy and does not await. The timeframes (100ms & 320ms) are realistic as well.
I would like to keep Tokio for a simpler concurrency design, but what I think you are saying is that it's not the right tool for the job.

That makes me expect that the version with the spawned task to work and the one directly in main to not work. But it is the opposite.

I figured out a different implementation that does not rely on a std::sync::Mutex, but a tokio::sync one. My coding problem is solved, but I still do not understand, why the original implementation was blocking.

New LossyStream Implementation
/// Transform a stream into a lossy stream.
/// The stream will be moved to another tokio task, where it is continuously polled for new values.
/// The latest value is cached and returned once. If no value is present, the stream waits.
fn into_lossy<T>(
    inner: impl Stream<Item = T> + Send + 'static,
    on_drop: impl Fn(T) + Send + 'static,
) -> impl Stream<Item = T>
where
    T: Send + 'static,
{
    // the currently available value in the output stream
    let cache: Arc<tokio::sync::Mutex<Option<T>>> = Arc::new(tokio::sync::Mutex::new(None));
    let cache_clone = Arc::clone(&cache);

    // indicator that the source stream is completed
    let done = Arc::new(AtomicBool::new(false));
    let done_clone = Arc::clone(&done);

    tokio::spawn(async move {
        let mut inner = Box::pin(inner);
        while let Some(v) = inner.next().await {
            let mut cache = cache_clone.lock().await;

            // The old value is about to be replaced, invoke the drop fn
            if let Some(v) = cache.take() {
                on_drop(v);
            }

            *cache = Some(v);
        }

        // The stream is done, if the last value has not been picked up,
        //   it is considered dropped.
        let mut cache = cache_clone.lock().await;
        if let Some(v) = cache.take() {
            on_drop(v);
        }

        // send signal that no more values will be available
        done_clone.store(true, std::sync::atomic::Ordering::Relaxed);
    });

    async_stream::stream! {
        loop {
            let mut cache = cache.lock().await;
            if cache.is_some() {
                // there is a new value available

                // remove the value from the cache and drop the MutexGuard immediately
                let v = cache.take();
                drop(cache);

                yield v.unwrap();
            }

            if done.load(std::sync::atomic::Ordering::Relaxed) {
                // the stream is completed
                break;
            }
        }
    }
}

Trying a rudimentary run of the "Directly in tokio::main" vs "Wrapped in tokio::spawn" versions, it appears both print drop. You mentioned the latter should not. Is there a difference between this playground reproduction and your local implementation?

Also, in the second implementation the yield_now() can be simplified to

let handle = tokio::spawn(async move {
    run().await.unwrap();
});
handle.await.unwarp();

which will also run indefinitely given the run() asynchronous task does not terminate.

This implementation of a poll operation is incorrect, but the incorrectness usually only shows up when multiple threads are used. Whenever you store a waker, you have to check if the condition you’re waiting for has become true after storing the waker, not just before. Otherwise, some other thread might have made the condition become true after you checked the condition, but before you stored the waker, so it doesn’t wake the correct waker, but either a previous waker or no waker. It's a kind of race condition.

I’d guess (but haven’t proven) that the reason you are seeing the problem with a spawned task is that spawning gets threads involved, and your thread::sleep() calls cause the thread’s activities to line up right to trigger this bug. (Under other test conditions, it might happen only very rarely, since it requires events to line up in a small time window.)

3 Likes

Oh, sorry. Yes, both print drop, however only the version directly in tokio::main is receiving items in the main loop (while let Some(_) = s.next().await { ... }). I adjusted the examples to make that clear.

I don't understand what you mean with checking the condition.
Can you give some examples so I can understand a proper implementation?

For your code, it could look like this (not tested):

impl<T> Stream for LossyStream<T> {
    type Item = T;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // First check
        if let Poll::Ready(v) = self.poll_helper() {
            return Poll::Ready(v);
        }

        *self.waker.lock().unwrap() = Some(cx.waker().clone());

        // Second check in case something changed
        self.poll_helper()
    }
}

impl<T> LossyStream<T> {
    fn poll_helper(&mut self) -> Poll<Option<T>> {
        let mut cache = self.cache.lock().unwrap();
        if cache.is_some() {
            let v = cache.take();
            drop(cache);

            Poll::Ready(v)
        } else if self.done.load(std::sync::atomic::Ordering::Relaxed) {
            Poll::Ready(None)
        } else {
            Poll::Pending
        }
    }
}

Something I forgot to mention is that this is also only a problem when the state your future is consulting is not behind a single mutex along with the waker; if it is, then the ordering of events this is concerned with can’t happen. But you may not want, or it may not be possible, to use a single mutex for everything.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.