Is my pin projection actually safe?

Hello,

I'd like to write a Stream that can run futures concurrently on the tokio runtime. For the sake of learning and understanding Pin, I'd like to not use pin_project, and write the projections myself. I came up with this:

use std::{
    collections::VecDeque,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use futures::{
    stream::{FuturesUnordered, Stream, StreamExt},
    Future,
};
use tokio::{
    task::{JoinError, JoinHandle},
    time::delay_for,
};

/// `ConcurrentFutures` can keep a capped number of futures running concurrently, and yield their
/// result as they finish. When the max number of concurrent futures is reached, new tasks are
/// queued until some in-flight futures finish.
pub struct ConcurrentFutures<T>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    /// in-flight futures
    running: FuturesUnordered<JoinHandle<T::Output>>,
    /// buffered tasks
    pending: VecDeque<T>,
    /// max number of concurrent futures
    max_in_flight: usize,
}

impl<T> ConcurrentFutures<T>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    pub fn new(max_in_flight: usize) -> Self {
        Self {
            running: FuturesUnordered::new(),
            pending: VecDeque::new(),
            max_in_flight,
        }
    }

    pub fn push(&mut self, task: T) {
        self.pending.push_back(task)
    }

    fn project_running(self: Pin<&mut Self>) -> Pin<&mut FuturesUnordered<JoinHandle<T::Output>>> {
        // This is okay because:
        // - the closure doesn't move out for `s`
        // - the returned value cannot be moved as long as `s` is not moved
        // See: https://doc.rust-lang.org/std/pin/struct.Pin.html#method.map_unchecked_mut
        unsafe { self.map_unchecked_mut(|s| &mut s.running) }
    }

    fn project_pending(self: Pin<&mut Self>) -> &mut VecDeque<T> {
        // Not sure whether this is actually safe. When `ConcurrentFutures` is pinned, should we
        // be able to push/pop from one of the fields?
        unsafe { &mut self.get_unchecked_mut().pending }
    }
}

In the code above, is the project_pending method actually correct? It seems to work, but I am not sure whether I am allowed to project the pending field like this. After all, when I poll my ConcurrentFutures<T> it is pinned, so its content shouldn't move. By popping from the VecDeque in poll_next, I may trigger a re-allocation, so I think I'm breaking the contract of Pin?

You can build and run the code from this repo:

git clone https://github.com/little-dude/learn-pin
cd learn-pin
cargo run

You can outsource safety to the proven pin-project crate.

I know but as I said:

For the sake of learning and understanding Pin , I'd like to not use pin_project , and write the projections myself

1 Like

It may seem counter-intuitive that the field of a pinned struct might not be pinned, but that is actually the easiest choice: if a Pin <&mut Field> is never created, nothing can go wrong! So, if you decide that some field does not have structural pinning, all you have to ensure is that you never create a pinned reference to that field.

In other words, have you ever polled any of those pending futures, or otherwise created a Pin<&mut T> for one of them? Are you sure that futures only go from pending to running, and never, ever go the other way?

1 Like

In this case you do not need any pin projections whatsoever, since none are stored directly in your future. Your struct can be unconditionally Unpin.

As for VecDeque, I assume you never create a pinned reference directly to anything inside it, in which case it's fine.

1 Like

Your struct can be unconditionally Unpin.

For the struct to be Unpin, I need the T: Unpin trait bound I think.

You don't need that. FuturesUnordered is unconditionally Unpin because it pins its contents by boxing them, so it can be moved without moving its contents. The VecDeque never pins its contents in the first place, so they can be freely moved without issue.

Ah yes indeed! But I have to manually impl Unpin for ConcurrentFuture, because:

pub struct ConcurrentFutures<T>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    /// in-flight futures
    running: FuturesUnordered<JoinHandle<T::Output>>,
    /// buffered tasks
    pending: VecDeque<T>,
    /// max number of concurrent futures
    max_in_flight: usize,
}

impl<T> ConcurrentFutures<T>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    pub fn new(max_in_flight: usize) -> Self {
        Self {
            running: FuturesUnordered::new(),
            pending: VecDeque::new(),
            max_in_flight,
        }
    }

    pub fn push(&mut self, task: T) {
        self.pending.push_back(task)
    }
}


impl<T> Stream for ConcurrentFutures<T>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    type Item = Result<T::Output, JoinError>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        while this.running.len() < this.max_in_flight {
            if let Some(pending) = this.pending.pop_front() {
                let handle = tokio::spawn(pending);
                this.running.push(handle);
            } else {
                break;
            }
        }
        Pin::new(&mut this.running).poll_next(cx)
    }
}

Fails to compile with:

error[E0277]: `T` cannot be unpinned
  --> src/main.rs:45:25
   |
45 |         let this = self.get_mut();
   |                         ^^^^^^^ within `ConcurrentFutures<T>`, the trait `Unpin` is not implemented for `T`
   |
   = note: required because it appears within the type `PhantomData<T>`
   = note: required because it appears within the type `Unique<T>`
   = note: required because it appears within the type `alloc::raw_vec::RawVec<T>`
   = note: required because it appears within the type `VecDeque<T>`
   = note: required because it appears within the type `ConcurrentFutures<T>`
help: consider further restricting this bound
   |
40 |     T: Future + Send + 'static + Unpin,
   |                                ^^^^^^^

Shouldn't this automatically be implemented, since all the fields are Unpin? It's actually because of that message that I initially started looking into projections...

Yes, the standard library does not yet have more general implementations like impl<T> Unpin for VecDeque<T>, they have unnecessary where T: Unpin bounds on them.

As long as you don't use any unsafe in the implementation you are free to impl Unpin for Self (since this is safe there is no way to cause unsoundness without writing unsafe somewhere else).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.