Hello,
I'd like to write a Stream
that can run futures concurrently on the tokio runtime. For the sake of learning and understanding Pin
, I'd like to not use pin_project
, and write the projections myself. I came up with this:
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{
stream::{FuturesUnordered, Stream, StreamExt},
Future,
};
use tokio::{
task::{JoinError, JoinHandle},
time::delay_for,
};
/// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their
/// result as they finish. When the max number of concurrent futures is reached, new tasks are
/// queued until some in-flight futures finish.
pub struct ConcurrentFutures<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
/// in-flight futures
running: FuturesUnordered<JoinHandle<T::Output>>,
/// buffered tasks
pending: VecDeque<T>,
/// max number of concurrent futures
max_in_flight: usize,
}
impl<T> ConcurrentFutures<T>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
pub fn new(max_in_flight: usize) -> Self {
Self {
running: FuturesUnordered::new(),
pending: VecDeque::new(),
max_in_flight,
}
}
pub fn push(&mut self, task: T) {
self.pending.push_back(task)
}
fn project_running(self: Pin<&mut Self>) -> Pin<&mut FuturesUnordered<JoinHandle<T::Output>>> {
// This is okay because:
// - the closure doesn't move out for `s`
// - the returned value cannot be moved as long as `s` is not moved
// See: https://doc.rust-lang.org/std/pin/struct.Pin.html#method.map_unchecked_mut
unsafe { self.map_unchecked_mut(|s| &mut s.running) }
}
fn project_pending(self: Pin<&mut Self>) -> &mut VecDeque<T> {
// Not sure whether this is actually safe. When `ConcurrentFutures` is pinned, should we
// be able to push/pop from one of the fields?
unsafe { &mut self.get_unchecked_mut().pending }
}
}
In the code above, is the project_pending
method actually correct? It seems to work, but I am not sure whether I am allowed to project the pending
field like this. After all, when I poll my ConcurrentFutures<T>
it is pinned, so its content shouldn't move. By popping from the VecDeque
in poll_next
, I may trigger a re-allocation, so I think I'm breaking the contract of Pin
?
You can build and run the code from this repo:
git clone https://github.com/little-dude/learn-pin
cd learn-pin
cargo run