Hi,
I have an async processing pipeline for a stream of items. One processing step takes a lot longer that previous ones and I want to keep the latency from input to output low. I can afford to drop input values, but want to be notified if that happens.
I implemented a LossyStream that continuously fetches values from the input stream, caches the latest value and emits it, upon request. When values are dropped, a closure is called.
LossyStream Implementation
use futures::stream;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::{error::Error, iter, thread, time::Duration};
/// A lossy stream which drops values, if they are not fetched in time.
pub struct LossyStream<T> {
cache: Arc<Mutex<Option<T>>>,
waker: Arc<Mutex<Option<Waker>>>,
done: Arc<AtomicBool>,
}
impl<T> Stream for LossyStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut cache = self.cache.lock().unwrap();
if cache.is_some() {
// there is a new value available
// remove the value from the cache and drop the MutexGuard immediately
let v = cache.take();
drop(cache);
Poll::Ready(v)
} else if self.done.load(std::sync::atomic::Ordering::Relaxed) {
// the stream is completed
Poll::Ready(None)
} else {
// We are not ready to emit another value yet.
// We need to store the current waker, so that we can wake the task,
// once another value is available.
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}
/// Transform a stream into a lossy stream.
/// The stream will be moved to another tokio task, where it is continuously polled for new values.
/// The latest value is cached and returned once. If no value is present, the stream waits.
fn into_lossy<T>(
inner: impl Stream<Item = T> + Send + 'static,
on_drop: impl Fn(T) + Send + 'static,
) -> LossyStream<T>
where
T: Send + 'static,
{
// the currently available value in the output stream
let cache: Arc<Mutex<Option<T>>> = Arc::new(Mutex::new(None));
let cache_clone = Arc::clone(&cache);
// notifier to signal that a new value can be fetched
let waker = Arc::new(Mutex::new(Option::<Waker>::None));
let waker_clone = Arc::clone(&waker);
// indicator that the source stream is completed
let done = Arc::new(AtomicBool::new(false));
let done_clone = Arc::clone(&done);
tokio::spawn(async move {
let mut inner = Box::pin(inner);
while let Some(v) = inner.next().await {
{
let mut cache = cache_clone.lock().unwrap();
// The old value is about to be replaced, invoke the drop fn
if let Some(v) = cache.take() {
on_drop(v);
}
*cache = Some(v);
// notify the output stream, that a new value is present
let mut waker = waker_clone.lock().unwrap();
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
// The stream is done, if the last value has not been picked up,
// it is considered dropped.
let mut cache = cache_clone.lock().unwrap();
if let Some(v) = cache.take() {
on_drop(v);
}
// send signal that no more values will be available
done_clone.store(true, std::sync::atomic::Ordering::Relaxed);
});
LossyStream { waker, cache, done }
}
// trait for easy integration
pub trait IntoLossyStream<T>
where
T: Send + 'static,
{
fn into_lossy(self, on_drop: impl Fn(T) + Send + 'static) -> LossyStream<T>;
}
// blanket implementation for all streams
impl<T, AnyStream: Stream<Item = T> + Send + 'static> IntoLossyStream<T> for AnyStream
where
T: Send + 'static,
{
fn into_lossy(self, on_drop: impl Fn(T) + Send + 'static) -> LossyStream<T>
where
T: Send + 'static,
{
into_lossy(self, on_drop)
}
}
This implementation works but only in tokio::main
and I do not understand why.
Directly in tokio::main
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
run().await.unwrap();
Ok(())
}
async fn run() -> Result<(), Box<dyn Error>> {
let i = iter::repeat_with(|| {
thread::sleep(Duration::from_millis(100));
1
});
let mut s = stream::iter(i)
// drop
.into_lossy(|_| println!(" drop"))
.map(|x| {
thread::sleep(Duration::from_millis(320));
x
});
while let Some(_) = s.next().await {
println!("next");
}
Ok(())
}
Wrapped in tokio::spawn
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tokio::spawn(async move {
run().await.unwrap();
});
loop {
tokio::task::yield_now().await;
}
Ok(())
}
async fn run() -> Result<(), Box<dyn Error>> {
let i = iter::repeat_with(|| {
thread::sleep(Duration::from_millis(100));
1
});
let mut s = stream::iter(i)
// drop
.into_lossy(|_| println!(" drop"))
.map(|x| {
thread::sleep(Duration::from_millis(320));
x
});
while let Some(_) = s.next().await {
println!("next");
}
Ok(())
}
Both examples emit "drop" messages, but only the example "Directly in tokio::main" prints messages in the main loop (while let Some(_) = s.next().await {...}
).
Even when using a multi threaded scheduler, there still seems to be blocking behaviour inside the LossyStream, but I do not understand why.
My expectation is that the tokio::main fn is the same as a tokio::spawn task. Is that not the case?