Failing to write an async version of recvmsg for Tokio's UnixDatagram

Hello! I'm a Rust beginner and I'm trying to write an extension trait for Tokio's UnixDatagram with two functions - try_recvmsg and recvmsg. I'm using nix's safe wrapper for recvmsg. The trait is as follows.

trait TokioRecvmsgExt {
    fn try_recvmsg<'a, 'outer, 'inner, S>(
        &'a self,
        iov: &'outer mut [IoSliceMut<'inner>],
        cmsg_buf: Option<&'a mut Vec<u8>>,
        flags: MsgFlags,
    ) -> io::Result<RecvMsg<'a, 'outer, S>>
    where
        S: SockaddrLike + 'a,
        'inner: 'outer;

    async fn recvmsg<'a, 'outer, 'inner, S>(
        &'a self,
        iov: &'outer mut [IoSliceMut<'inner>],
        cmsg_buf: Option<&'a mut Vec<u8>>,
        flags: MsgFlags,
    ) -> io::Result<RecvMsg<'a, 'outer, S>>
    where
        S: SockaddrLike + 'a,
        'inner: 'outer;
}

I used the same lifetime annotations as the nix's recvmsg and try_recvmsg's implementation works without problems:

fn try_recvmsg<'a, 'outer, 'inner, S>(
    &'a self,
    iov: &'outer mut [IoSliceMut<'inner>],
    cmsg_buf: Option<&'a mut Vec<u8>>,
    flags: MsgFlags,
) -> io::Result<RecvMsg<'a, 'outer, S>> 
where
    S: SockaddrLike + 'a,
    'inner: 'outer,
{
    self.try_io(Interest::READABLE, || {
        recvmsg(self.as_raw_fd(), iov, cmsg_buf, flags).map_err(|e| io::Error::from(e))
    })
}

Unfortunately, trying to implement the asynchronous function isn't that easy. The following code returns a couple of errors:

async fn recvmsg<'a, 'outer, 'inner, S>(
    &'a self,
    iov: &'outer mut [IoSliceMut<'inner>],
    cmsg_buf: Option<&'a mut Vec<u8>>,
    flags: MsgFlags,
) -> io::Result<RecvMsg<'a, 'outer, S>>
where
    S: SockaddrLike + 'a,
    'inner: 'outer,
{
    self.async_io(Interest::READABLE, || {
        recvmsg(self.as_raw_fd(), iov, cmsg_buf, flags).map_err(|e| io::Error::from(e))
    })
    .await
}
error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:46:13
   |
37 |         iov: &'outer mut [IoSliceMut<'inner>],
   |         --- variable defined here
...
45 |         self.async_io(Interest::READABLE, || {
   |                                            - inferred to be a `FnMut` closure
46 |             recvmsg(self.as_raw_fd(), iov, cmsg_buf, flags).map_err(|e| io::Error::from(e))
   |             ^^^^^^^^^^^^^^^^^^^^^^^^^^---^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |             |                         |
   |             |                         variable captured here
   |             returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

error[E0507]: cannot move out of `cmsg_buf`, a captured variable in an `FnMut` closure
  --> src/main.rs:46:44
   |
38 |         cmsg_buf: Option<&'a mut Vec<u8>>,
   |         -------- captured outer variable
...
45 |         self.async_io(Interest::READABLE, || {
   |                                           -- captured by this `FnMut` closure
46 |             recvmsg(self.as_raw_fd(), iov, cmsg_buf, flags).map_err(|e| io::Error::from(e))
   |                                            ^^^^^^^^ move occurs because `cmsg_buf` has type `std::option::Option<&mut std::vec::Vec<u8>>`, which does not implement the `Copy` trait

Part of the problem is that async_io calls the closure argument in a loop and iov is a mutable reference and cmsg_buf is an Option of mutable reference to a vector. Maybe interior mutability can help to make the closure Fn and not FnMut. I don't know how to approach the errors. My intuition can be completely wrong so I'm asking for help. :pray:

If you have a working try_io, then I recommend that you just combine it with writable like this:

loop {
    stream.writable().await?;
    match stream.try_recvmsg(...) {
        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
        output => return output,
    }
}

Thank you! It works but I'm wondering how to make it work with async_io, mostly for learning purposes? If I use Option<Rc<RefCell<Vec<u8>>>> for cmsg_buf and borrowing it "by hand" but the compiler still complains about the closure being FnOnce because of moving.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.