Tokio::sync::oneshot close before sending message

I would like to send a message between two tasks by using tokio::sync::oneshot, but Unfortunately, before sending data, the is_closed() returns true.
I am sending the oneshot::sender in #L38 and receive message in #L40
https://github.com/SajjadPourali/oneshot_sender-is_closed/blob/master/src/main.rs#L38
I am sending the message, but in #L61 method returns it's closed.
https://github.com/SajjadPourali/oneshot_sender-is_closed/blob/master/src/main.rs#L61

$cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.11s
     Running `target/debug/problem`
[src/main.rs:60] oneshot_sender.is_closed() = true
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: 123', src/libcore/result.rs:1165:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

Update:

  • I have simplified the code.
  • Since I'm maintaining a library which its problem is dependent on this, I have asked the question in stackoverflow.

Your dropping end in poll_write, need to keep the future until it is ready.

struct UdpStream {
 active: Option<Pin<Box<dyn Future<...

fn poll_write {
  if self.active.is_none() {
     ....
     self.active = Some(future);
  }

//  poll_flush() } fn poll_flush {

  match self.active { // might need to take
     None => Poll::Ready(Ok(())),
     Some(f) => {
         let poll = f.poll(
         if let Ready(_ = poll {
            self.active = None
         }
         poll
     }
  }
}

Edit: Commented out incorrect poll_flush use.

1 Like

Thank you for your response. Yes, you right the oneshot::receiver will drop in poll_write, but unfortunately, I couldn't understand/implement your proposed solution.
Could you please give me either more details or more complete code in the example code?

That would take a long time to iron out types. All my code is suggesting is you add a variable to the struct then check it when poll_write is called. (Reuse same code for poll_flush.)

1 Like

Thank you @jonh ,
I have added a field of struct, but it is not possible to call pull_flush end of poll_write, because of return type is mismatch, moreover, I can not use oneshot::Receiver<_> in both of poll_write and poll_flush methods, because the Copy trait is not implemented on it.

here is my modified code :

use futures::future::{Future, TryFutureExt};
use std::cell::Cell;
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use tokio::{
    io::{AsyncWrite, AsyncWriteExt},
    sync::{mpsc, oneshot},
};

use std::io::{Error, ErrorKind};

macro_rules! pin_mut {
    ($($x:ident),*) => { $(
        let mut $x = $x;
        #[allow(unused_mut)]
        let mut $x = unsafe {
            Pin::new_unchecked(&mut $x)
        };
    )* }
}

struct UdpStream {
    writer: mpsc::Sender<oneshot::Sender<usize>>,
    temp: Cell<Option<oneshot::Receiver<usize>>>,
}

impl AsyncWrite for UdpStream {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context,
        _buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        let (tx, rx) = oneshot::channel::<usize>();
        let mut writer = self.writer.clone();
        self.temp.set(Some(rx));
        let future = writer
            .send(tx)
            .map_err(|_| Error::new(ErrorKind::Other, "oh no!"))
            .and_then(|_| {
                self.temp
                    .take() // the oneshot is not implemented Copy trait and It is onetime using variable.  
                    .unwrap()
                    .map_err(|_| Error::new(ErrorKind::Other, "oh no!"))
            });

        pin_mut!(future);
        future.poll(cx).map(|len| len)
    }
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (tx, mut rx) = mpsc::channel::<oneshot::Sender<usize>>(10);
    let mut x = UdpStream {
        writer: tx,
        temp: Cell::new(None),
    };

    tokio::spawn(async move {
        let oneshot_sender = rx.recv().await.unwrap();
        dbg!(oneshot_sender.is_closed());
        oneshot_sender.send(123).unwrap();
    });

    let x = x.write("Hey".as_bytes()).await.unwrap();
    dbg!(x);

    Ok(())
}

I have changed the code to the simpler one.
https://github.com/SajjadPourali/oneshot_sender-is_closed

Sorry it was a incorrect to use poll_flush. (Only comes in use if a write is accepted but not fully processed.)

Your changed code is still only calling future.poll( once. It isn't what I envisioned. I guess the best thing really is test examples that highlight if code is working.

x.write("Hey".as_bytes()).await
A little tracing through source. This code leads to the write future which calls poll_write repeatedly with same buffer until ready.

Added extreme case note: When receiver is added to your structure there is possibility some code will drop it rather than finishing a await (e.g. a timeout triggered.) This would then be a path where the oneshot is closed before send.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.