Scoped async spawn works*

... but wait, isn't that fundamentally unsound? Yes, a "real" spawn onto the executor (where the spawned tasks progress in parallel) is unavoidably unsound; there's no[1] way to prevent capturing of state borrowed by the parent task[2].

However, half the point of async is that it provides structured concurrency without mandating parallelism, e.g. select!, join!, et al. So if we "spawn" tasks not onto the executor but rather multiplexed into the parent task's Future::poll, there's no issue of tasks potentially outliving their captured state anymore.

API summary and example:

pub async fn scope<'env, F, T>(f: F) -> T
where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> ScopedJoinHandle<'scope, T>;

pub fn Scope<'scope, 'env>::spawn<F>(&'scope self, f: F) -> ScopedJoinHandle<'scope, F::Output>
where F: Future<Output: Send> + Send + 'scope;

impl<'scope, T> Future<Output = T> for ScopedJoinHandle<'scope, T>;

let x = String::from("cad97");
scope(|s| s.spawn(async {
    s.spawn(async { println!("{x:?}") });
    s.spawn(async { println!("{x:?}") });
}))
.await;

It's theoretically possible to remove the requirement for the outer s.spawn, but it requires some ugly trait shimming and runs into known limitations in type inference, making it not quite work properly yet.

Like thread::scope, there is a nonzero cost to using scoped compared to a global spawn. However, the overhead is a bit more for async spawns just due to the different execution model[3].

Basic implementation

Utilizing a lot of the futures crates' utilities for convenience. Only implements the Send version. As such, definitely contains allocation and synchronization which isn't strictly necessary for this application.

[playground]

use futures::{
    channel::{mpsc, oneshot},
    future::FutureObj,
    prelude::*,
};
use std::{
    future::poll_fn,
    marker::PhantomData,
    mem::transmute,
    pin::Pin,
    task::{ready, Context, Poll},
};

type PhantomInvariant<'a> = PhantomData<fn(&'a ()) -> &'a ()>;
type PhantomCovariant<'a> = PhantomData<fn() -> &'a ()>;

pub struct Scope<'scope, 'env: 'scope> {
    send: mpsc::UnboundedSender<FutureObj<'static, ()>>,
    scope: PhantomInvariant<'scope>,
    env: PhantomInvariant<'env>,
}

pub struct ScopedJoinHandle<'scope, T> {
    recv: oneshot::Receiver<T>,
    scope: PhantomCovariant<'scope>,
}

pub async fn scope<'env, F, T>(f: F) -> T
where
    F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> ScopedJoinHandle<'scope, T>,
{
    let (send, recv) = mpsc::unbounded();
    let scope = Scope {
        send,
        scope: PhantomData,
        env: PhantomData,
    };

    async fn inner<'scope, T>(
        mut recv: mpsc::UnboundedReceiver<FutureObj<'scope, ()>>,
        result: ScopedJoinHandle<'scope, T>,
    ) -> T {
        let mut tasks = stream::FuturesUnordered::new();
        poll_fn(|cx| {
            while let Poll::Ready(task) = recv.select_next_some().poll_unpin(cx) {
                tasks.push(task);
            }
            if ready!(tasks.poll_next_unpin(cx)).is_some() {
                cx.waker().wake_by_ref();
                Poll::Pending
            } else {
                Poll::Ready(())
            }
        })
        .await;

        recv.close();
        assert!(tasks.is_empty());
        assert!(matches!(recv.try_next(), Ok(None)));
        result.now_or_never().expect("task should be completed")
    }

    let recv = apply_receiver_covariance(recv);
    inner(recv, f(&scope)).await
}

fn apply_receiver_covariance<'snd: 'rcv, 'rcv>(
    receiver: mpsc::UnboundedReceiver<FutureObj<'snd, ()>>,
) -> mpsc::UnboundedReceiver<FutureObj<'rcv, ()>> {
    // SAFETY: FutureObj<'a, _> is covariant over 'a
    // SAFETY: Receiver<T> is logically fn() -> T, thus logically covariant in T
    // SAFETY: The only type adjustment is the variant-position lifetime
    unsafe { transmute(receiver) }
}

impl<'scope, 'env> Scope<'scope, 'env> {
    pub fn spawn<F>(&'scope self, f: F) -> ScopedJoinHandle<'scope, F::Output>
    where
        F: Future + Send + 'scope,
        F::Output: Send,
    {
        let (send, recv) = oneshot::channel();
        self.send(async move {
            // Allow send failure when join handle is dropped.
            _ = send.send(f.await);
        });
        ScopedJoinHandle {
            recv,
            scope: PhantomData,
        }
    }

    fn send(&self, task: impl Future<Output = ()> + Send + 'scope) {
        let task: FutureObj<'scope, ()> = Box::pin(task).into();
        // SAFETY: Dropping the sender never drops queued futures
        // SAFETY: The receiver always receives as FutureObj<'scope, ()>
        // SAFETY: The only type adjustment is the variant-position lifetime
        let task: FutureObj<'static, ()> = unsafe { transmute(task) };
        let Ok(()) = self.send.unbounded_send(task) else {
            // A send failure happens iff the receiver has been dropped.
            // I.e. the `<fn scope>::Output` is being dropped, and some
            // mischievous future *still in the channel* attempted to
            // spawn from its drop implementation. This shouldn't happen.
            // We won't panic if this happens from a running task, as the
            // task pool drops before the channel receiver. When directly
            // spawning async blocks, that is the only possible scenario.
            panic!("attempted to spawn scoped task during cancellation");
        };
    }
}

impl<'scope, T> Future for ScopedJoinHandle<'scope, T> {
    type Output = T;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> {
        self.recv
            .poll_unpin(cx)
            .map(|out| out.expect("task should not be cancelled"))
    }
}

// ----------------

pub async fn test() {
    let x = String::from("cad97");
    scope(|s| {
        s.spawn(async {
            s.spawn(async { println!("{x:?}") });
            s.spawn(async { println!("{x:?}") });
        })
    })
    .await;
}

fn main() {
    futures::executor::block_on(test());
}

This requires solely the same bit of unsafe that thread::scope does: erasing the lifetime of spawned tasks in order to put them into a 'static shaped hole, although for subtly different reasons[4].


If nobody blows a hole in my soundness logic, I think I'll PR (a refined version of[5]) this implementation to the futures crate family. This feels like the kind of useful but non-trivial utility which should exist in the library ecosystem.


  1. No way is a bit of an overstatement; a scoped async task spawn would be sound if (and only if) the only captured environment is owned by the parent task. It is possible to enforce this property by doing captures manually, but that loses most of the convenience provided by the scoped spawn API. ↩︎

  2. This is unsound because the borrow lifetime can expire without informing the spawned task, if the parent task is forgotten/leaked. ↩︎

  3. An async scope has essentially the same overhead as thread scope, but with the additional requisite costs of a) polling spawned tasks needs to walk the parent's poll stack and b) adding a frame to the waker stack do differentiate between subtask wakes here as opposed to at the top level. ↩︎

  4. It's almost possible without any unsafe, except Scope can't be dropped if it contains Sender<FutureObj<'scope>>, as we rely on borrowing it for 'scope. So, we unsafely apply a variance transmute in order to have a channel of FutureObj<'scope, ()> typed as a channel of FutureObj<'static, ()>. This results in a quite subtle requirement not relevant to scoped threads — cancelling async fn scope (dropping the future after polling) must drop any scoped tasks before dropping the Scope, as otherwise they may still have a reference to the scope and try to use it, which would alias the &mut self used to drop the scope. Thankfully, dropping the mpsc receiver drains the channel contents, and dropping a mpsc sender never drops items queued on the channel.

    Technically this relies on non-guaranteed properties of the futures crate's channel implementation. ↩︎

  5. Since the spawns can be presumed to all come from the same task tree, the full mpsc::unbounded channel is a bit overkill. Synchronization is still needed in case someone uses scoped threads to try to spawn in parallel, but since that is exceedingly unlikely, spawning doesn't need the full atomic queue; a trylock dequeue should be sufficient.

    Also it should be made capable of spawning any UnsafeFutureObj if it's going to live in the futures crate and store FutureObj instead of just using Pin<Box<dyn Future>>. ↩︎

5 Likes

So it would have block_in_place starvation?

Yes, any blocking within a scoped task tree will block the entire task tree, just the same as with select! or join!. So it isn't a solution for wrapping blocking IO, you still need to utilize readiness based IO and/or buffered channels to spawn_blocking pools.

Reducing top-level spawns does make you more "vulnerable" to accidental blocking causing sibling task starvation. While defense in depth is good, the solution to accidental blocking is better code structure less likely to block rather than less structured concurrency, IMHO.

1 Like

I haven't double-checked your implementation, but yes, something along these lines is possible.

Not an issue in the logic, but certainly in the assumptions in the context of your “basic implementation”:

use futures::poll;
use std::pin::pin;
use futures::future::pending;

pub async fn unsoundness_test() {
    let fut = scope(|s| {
        s.spawn(async {
            let fut = async {
                struct PanicOnDrop;
                impl Drop for PanicOnDrop {
                    fn drop(&mut self) {
                        panic!("boom");
                    }
                }
                let _guard = PanicOnDrop;
                eprintln!("initialized PanicOnDrop future");
                pending::<()>().await;
            };
            let mut fut = Box::pin(fut);
            let _ = poll!(&mut fut);
            s.spawn(fut);
            let fut = async {
                #[allow(dead_code)]
                struct AccessScopeOnDrop<'scope, 'env>(&'scope Scope<'scope, 'env>);
                impl Drop for AccessScopeOnDrop<'_, '_> {
                    fn drop(&mut self) {
                        eprintln!("accessing scope");
                        let _ = &*self.0; // miri reports UB
                    }
                }
                let _guard = AccessScopeOnDrop(s);
                eprintln!("initialized AccessScopeOnDrop future");
                pending::<()>().await;
            };
            let mut fut = Box::pin(fut);
            let _ = poll!(&mut fut);
            s.spawn(fut);
        })
    });
    let mut fut = pin!(fut);

    let _ = poll!(&mut fut);

    drop(fut);
}

fn main() {
    futures::executor::block_on(unsoundness_test());
}

Rust Playground

   Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 0.29s
     Running `/playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/bin/cargo-miri runner target/miri/x86_64-unknown-linux-gnu/debug/playground`
initialized PanicOnDrop future
initialized AccessScopeOnDrop future
thread 'main' panicked at src/main.rs:134:25:
boom
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
accessing scope
error: Undefined Behavior: trying to retag from <7808> for SharedReadOnly permission at alloc875[0x58], but that tag does not exist in the borrow stack for this location
   --> src/main.rs:150:33
    |
150 |                         let _ = &*self.0; // miri reports UB
    |                                 ^^^^^^^^
    |                                 |
    |                                 trying to retag from <7808> for SharedReadOnly permission at alloc875[0x58], but that tag does not exist in the borrow stack for this location
    |                                 this error occurs as part of retag at alloc875[0x58..0x60]
    |
    = help: this indicates a potential bug in the program: it performed an invalid operation, but the Stacked Borrows rules it violated are still experimental
    = help: see https://github.com/rust-lang/unsafe-code-guidelines/blob/master/wip/stacked-borrows.md for further information
help: <7808> was created by a SharedReadOnly retag at offsets [0x58..0x60]
   --> src/main.rs:153:48
    |
153 |                 let _guard = AccessScopeOnDrop(s);
    |                                                ^
help: <7808> was later invalidated at offsets [0x58..0x60] by a Unique retag
   --> src/main.rs:167:1
    |
167 | }
    | ^
    = note: BACKTRACE (of the first span):
    = note: inside `<unsoundness_test::{closure#0}::{closure#0}::{closure#0}::{closure#1}::AccessScopeOnDrop<'_, '_> as std::ops::Drop>::drop` at src/main.rs:150:33: 150:41
    = note: inside `std::ptr::drop_in_place::<unsoundness_test::{closure#0}::{closure#0}::{closure#0}::{closure#1}::AccessScopeOnDrop<'_, '_>> - shim(Some(unsoundness_test::{closure#0}::{closure#0}::{closure#0}::{closure#1}::AccessScopeOnDrop<'_, '_>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<{async block@src/main.rs:144:23: 156:14}> - shim(Some({async block@src/main.rs:144:23: 156:14}))` at src/main.rs:156:13: 156:14
    = note: inside `std::ptr::drop_in_place::<std::boxed::Box<{async block@src/main.rs:144:23: 156:14}>> - shim(Some(std::boxed::Box<{async block@src/main.rs:144:23: 156:14}>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<std::pin::Pin<std::boxed::Box<{async block@src/main.rs:144:23: 156:14}>>> - shim(Some(std::pin::Pin<std::boxed::Box<{async block@src/main.rs:144:23: 156:14}>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<{async block@src/main.rs:83:19: 86:10}> - shim(Some({async block@src/main.rs:83:19: 86:10}))` at src/main.rs:83:19: 86:10
    = note: inside `std::ptr::drop_in_place::<std::boxed::Box<dyn futures::Future<Output = ()>>> - shim(Some(std::boxed::Box<dyn futures::Future<Output = ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<std::pin::Pin<std::boxed::Box<dyn futures::Future<Output = ()>>>> - shim(Some(std::pin::Pin<std::boxed::Box<dyn futures::Future<Output = ()>>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::mem::drop::<std::pin::Pin<std::boxed::Box<dyn futures::Future<Output = ()>>>>` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:992:24: 992:25
    = note: inside `futures_task::future_obj::if_alloc::<impl futures::future::UnsafeFutureObj<'_, ()> for std::pin::Pin<std::boxed::Box<{async block@src/main.rs:83:19: 86:10}>>>::drop` at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-task-0.3.30/src/future_obj.rs:262:13: 262:48
    = note: inside `<futures::future::LocalFutureObj<'_, ()> as std::ops::Drop>::drop` at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-task-0.3.30/src/future_obj.rs:90:18: 90:45
    = note: inside `std::ptr::drop_in_place::<futures::future::LocalFutureObj<'_, ()>> - shim(Some(futures::future::LocalFutureObj<'_, ()>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<futures::future::FutureObj<'_, ()>> - shim(Some(futures::future::FutureObj<'_, ()>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<std::option::Option<futures::future::FutureObj<'_, ()>>> - shim(Some(std::option::Option<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<futures::futures_channel::mpsc::queue::Node<futures::future::FutureObj<'_, ()>>> - shim(Some(futures::futures_channel::mpsc::queue::Node<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<std::boxed::Box<futures::futures_channel::mpsc::queue::Node<futures::future::FutureObj<'_, ()>>>> - shim(Some(std::boxed::Box<futures::futures_channel::mpsc::queue::Node<futures::future::FutureObj<'_, ()>>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::mem::drop::<std::boxed::Box<futures::futures_channel::mpsc::queue::Node<futures::future::FutureObj<'_, ()>>>>` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:992:24: 992:25
    = note: inside `<futures::futures_channel::mpsc::queue::Queue<futures::future::FutureObj<'_, ()>> as std::ops::Drop>::drop` at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-channel-0.3.30/src/mpsc/queue.rs:169:17: 169:41
    = note: inside `std::ptr::drop_in_place::<futures::futures_channel::mpsc::queue::Queue<futures::future::FutureObj<'_, ()>>> - shim(Some(futures::futures_channel::mpsc::queue::Queue<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>> - shim(Some(futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::sync::Arc::<futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>>::drop_slow` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/sync.rs:1752:18: 1752:67
    = note: inside `<std::sync::Arc<futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>> as std::ops::Drop>::drop` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/sync.rs:2407:13: 2407:29
    = note: inside `std::ptr::drop_in_place::<std::sync::Arc<futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>>> - shim(Some(std::sync::Arc<futures::futures_channel::mpsc::UnboundedInner<futures::future::FutureObj<'_, ()>>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<futures::futures_channel::mpsc::UnboundedSenderInner<futures::future::FutureObj<'_, ()>>> - shim(Some(futures::futures_channel::mpsc::UnboundedSenderInner<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<std::option::Option<futures::futures_channel::mpsc::UnboundedSenderInner<futures::future::FutureObj<'_, ()>>>> - shim(Some(std::option::Option<futures::futures_channel::mpsc::UnboundedSenderInner<futures::future::FutureObj<'_, ()>>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<futures::futures_channel::mpsc::UnboundedSender<futures::future::FutureObj<'_, ()>>> - shim(Some(futures::futures_channel::mpsc::UnboundedSender<futures::future::FutureObj<'_, ()>>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<Scope<'_, '_>> - shim(Some(Scope<'_, '_>))` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:507:1: 507:56
    = note: inside `std::ptr::drop_in_place::<{async fn body@src/main.rs:31:1: 65:2}> - shim(Some({async fn body@src/main.rs:31:1: 65:2}))` at src/main.rs:65:1: 65:2
note: inside closure
   --> src/main.rs:167:1
    |
167 | }
    | ^
    = note: inside closure at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-executor-0.3.30/src/local_pool.rs:317:23: 317:42
    = note: inside closure at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-executor-0.3.30/src/local_pool.rs:90:37: 90:47
    = note: inside `std::thread::LocalKey::<std::sync::Arc<futures_executor::local_pool::ThreadNotify>>::try_with::<{closure@futures_executor::local_pool::run_executor<(), {closure@futures::executor::block_on<{async fn body@src/main.rs:127:33: 167:2}>::{closure#0}}>::{closure#0}}, ()>` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:286:16: 286:31
    = note: inside `std::thread::LocalKey::<std::sync::Arc<futures_executor::local_pool::ThreadNotify>>::with::<{closure@futures_executor::local_pool::run_executor<(), {closure@futures::executor::block_on<{async fn body@src/main.rs:127:33: 167:2}>::{closure#0}}>::{closure#0}}, ()>` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:262:9: 262:25
    = note: inside `futures_executor::local_pool::run_executor::<(), {closure@futures::executor::block_on<{async fn body@src/main.rs:127:33: 167:2}>::{closure#0}}>` at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-executor-0.3.30/src/local_pool.rs:86:5: 102:7
    = note: inside `futures::executor::block_on::<{async fn body@src/main.rs:127:33: 167:2}>` at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-executor-0.3.30/src/local_pool.rs:317:5: 317:43
note: inside `main`
   --> src/main.rs:170:5
    |
170 |     futures::executor::block_on(unsoundness_test());
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 1 previous error

I mean, you almost point out the weakness yourself

Thankfully, dropping the mpsc receiver drains the channel contents, and dropping a mpsc sender never drops items queued on the channel.

Technically this relies on non-guaranteed properties of the futures crate's channel implementation.

except you don’t rely on non-guaranteed properties, but on non-properties. While "dropping the mpsc receiver drains the channel contents" is true, it’s not true that this is a properly enforced thing, as a soundness property, in the presence of panics.

I.e. if one channel element panics, the remaining ones stay in the channel and are dropped with the sender after all.

6 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.