Polling through a MutexGuard

Polling requires that self: Pin<&mut Self> as well as a context. How can I poll through a thread-safe wrapper that implies a signature different than self: Pin<&mut Self>? I see that I need to dereference the lock, but doesn't that drop the lock? How can I do it safely?

Need to poll T such that Arc<Mutex<T>>

where Self: Unpin: Pin::new(&mut mutex.lock().unwrap()).poll(cx)

Otherwise, you need a custom mutex that supports the Pin guarantees.

1 Like

Yeah, that looks like that was mostly it. For some reason, I'm getting this error though:

error[E0599]: no method named `poll_fill_buf` found for type `futures_util::io::buf_reader::BufReader<S>` in the current scope
   --> hyxe_crypt\src\net\async_crypt_splitter.rs:163:59
    |
163 |             let bytes: &[u8] = futures::ready!(this.input.poll_fill_buf(cx));
    |                                                           ^^^^^^^^^^^^^ method not found in `futures_util::io::buf_reader::BufReader<S>`

error[E0599]: no method named `consume` found for type `futures_util::io::buf_reader::BufReader<S>` in the current scope
   --> hyxe_crypt\src\net\async_crypt_splitter.rs:164:24
    |
164 |             this.input.consume(bytes.len());
    |                        ^^^^^^^ method not found in `futures_util::io::buf_reader::BufReader<S>`


    fn poll_scramble(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
        let mut this = self.inner.lock();
        if this.waker.is_none() {
            this.waker = Some(cx.waker().clone());
        }

        let mut this = Pin::new(&mut this);
        let window_size = this.window_size.clone();
        for idx in window_size {
            let bytes: &[u8] = futures::ready!(this.input.poll_fill_buf(cx));
            this.input.consume(bytes.len());
            // then, input these bytes through the scrambler
        }

        Poll::Pending
    }

impl<S: AsyncRead + Unpin, O: Sink<Bytes>, T: Sink<Bytes>> Stream for AsyncCryptScrambler<S, O, T> {
    type Item = ();

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_scramble(cx)
    }
}

and, the memory structure of the inner T:

pub struct AsyncCryptScramblerInner<S: AsyncRead + Unpin, O: Sink<Bytes>, T: Sink<Bytes>> {
    input: BufReader<S>,
    udp_output: O,
    tcp_output: T,
    group_receiver_config: GroupReceiverConfig,
    window_size: RangeInclusive<usize>,
    // keeps track of how many plaintext bytes have been processed
    plaintext_cursor: usize,
    waker: Option<Waker>
}

According to: futures::io::BufReader - Rust

AsyncBufRead is implemented for BufReader<R> if R: AsyncRead. In the struct i provided, I ensured this. Why is the method not being recognized?

I think you need to pin this.input. I usually use pin_project to propagate the pin into a struct field

1 Like

Slayed it!!!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.