$cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.11s
Running `target/debug/problem`
[src/main.rs:60] oneshot_sender.is_closed() = true
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: 123', src/libcore/result.rs:1165:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Update:
I have simplified the code.
Since I'm maintaining a library which its problem is dependent on this, I have asked the question in stackoverflow.
Thank you for your response. Yes, you right the oneshot::receiver will drop in poll_write, but unfortunately, I couldn't understand/implement your proposed solution.
Could you please give me either more details or more complete code in the example code?
That would take a long time to iron out types. All my code is suggesting is you add a variable to the struct then check it when poll_write is called. (Reuse same code for poll_flush.)
Thank you @jonh ,
I have added a field of struct, but it is not possible to call pull_flush end of poll_write, because of return type is mismatch, moreover, I can not use oneshot::Receiver<_> in both of poll_write and poll_flush methods, because the Copy trait is not implemented on it.
here is my modified code :
use futures::future::{Future, TryFutureExt};
use std::cell::Cell;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncWrite, AsyncWriteExt},
sync::{mpsc, oneshot},
};
use std::io::{Error, ErrorKind};
macro_rules! pin_mut {
($($x:ident),*) => { $(
let mut $x = $x;
#[allow(unused_mut)]
let mut $x = unsafe {
Pin::new_unchecked(&mut $x)
};
)* }
}
struct UdpStream {
writer: mpsc::Sender<oneshot::Sender<usize>>,
temp: Cell<Option<oneshot::Receiver<usize>>>,
}
impl AsyncWrite for UdpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
_buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let (tx, rx) = oneshot::channel::<usize>();
let mut writer = self.writer.clone();
self.temp.set(Some(rx));
let future = writer
.send(tx)
.map_err(|_| Error::new(ErrorKind::Other, "oh no!"))
.and_then(|_| {
self.temp
.take() // the oneshot is not implemented Copy trait and It is onetime using variable.
.unwrap()
.map_err(|_| Error::new(ErrorKind::Other, "oh no!"))
});
pin_mut!(future);
future.poll(cx).map(|len| len)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (tx, mut rx) = mpsc::channel::<oneshot::Sender<usize>>(10);
let mut x = UdpStream {
writer: tx,
temp: Cell::new(None),
};
tokio::spawn(async move {
let oneshot_sender = rx.recv().await.unwrap();
dbg!(oneshot_sender.is_closed());
oneshot_sender.send(123).unwrap();
});
let x = x.write("Hey".as_bytes()).await.unwrap();
dbg!(x);
Ok(())
}
Sorry it was a incorrect to use poll_flush. (Only comes in use if a write is accepted but not fully processed.)
Your changed code is still only calling future.poll( once. It isn't what I envisioned. I guess the best thing really is test examples that highlight if code is working.
x.write("Hey".as_bytes()).await
A little tracing through source. This code leads to the write future which calls poll_write repeatedly with same buffer until ready.
Added extreme case note: When receiver is added to your structure there is possibility some code will drop it rather than finishing a await (e.g. a timeout triggered.) This would then be a path where the oneshot is closed before send.