How do I fix the lifetimes on this function? Tokio + Nix

Hello,

I am working on a function to wrap the Nix crate's recvmmsg into an async function for use in a Tokio application. I'm struggling with lifetime errors in my current implementation and I was hoping someone could help me understand how to annotate this appropriately or if I am doing this correctly at all.

The idea is that the user of this function would provide a mutable buffer that contains space for capacity fixed-size datagrams. I am having a hard time getting the lifetime annotations on that user-provided buffer to correctly propagate through to the returned value of the function.

Sample code

Also pasted here for clarity:

use nix::sys::{
    socket::{MsgFlags, RecvMmsgData, RecvMsg},
    uio::IoVec,
};
use tokio::net::UdpSocket;
use thiserror::Error;

type CustomResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
    
const DatagramSize: usize = 1472

#[derive(Error, Debug)]
enum RecvmmsgError {
    #[error("no datagrams ready to be read")]
    WouldBlock,
    #[error("unknown")]
    Unknown,
}

async fn recvmmsg<'a>(
    s: &UdpSocket,
    buf: &'a mut [u8],
    capacity: usize,
) -> CustomResult<Vec<RecvMsg<'a>>> {
    let fd = s.as_raw_fd();

    let f = move || {
        let mut msgs: Vec<_> = buf
            .chunks_exact_mut(DatagramSize)
            .map(|buf| [IoVec::from_mut_slice(&mut buf[..])])
            .map(|iov| RecvMmsgData {
                iov,
                cmsg_buffer: None,
            })
            .collect();

        match nix::sys::socket::recvmmsg(fd, &mut msgs, MsgFlags::empty(), None) {
            Ok(r) => Ok(r),
            // nix gives back raw errno values - map them to what tokio expects (std::io::Error)
            Err(nix::errno::Errno::EAGAIN) => {
                Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))
            }
            Err(_) => Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                "unknown error in recvmmsg",
            )),
        }
    };

    s.readable().await?;

    match s.try_io(tokio::io::Interest::READABLE, f) {
        Ok(messages) => return Ok(messages),
        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
            Err(Box::new(RecvmmsgError::WouldBlock))
        }
        Err(e) => return Err(Box::new(RecvmmsgError::Unknown)),
    }
}

The error I get:

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
   --> src/recvmmsg.rs:396:14
    |
396 |             .chunks_exact_mut(DatagramSize)
    |              ^^^^^^^^^^^^^^^^
    |
note: first, the lifetime cannot outlive the lifetime `'_` as defined here...
   --> src/recvmmsg.rs:394:13
    |
394 |     let f = move || {
    |             ^^^^^^^
note: ...so that closure can access `buf`
   --> src/recvmmsg.rs:395:32
    |
395 |         let mut msgs: Vec<_> = buf
    |                                ^^^
note: but, the lifetime must be valid for the lifetime `'a` as defined here...
   --> src/recvmmsg.rs:387:19
    |
387 | async fn recvmmsg<'a>(
    |                   ^^
note: ...so that the types are compatible
   --> src/recvmmsg.rs:391:38
    |
391 |   ) -> CustomResult<Vec<RecvMsg<'a>>> {
    |  ______________________________________^
392 | |     let fd = s.as_raw_fd();
393 | |
394 | |     let f = move || {
...   |
427 | |     //    }
428 | | }
    | |_^
    = note: expected `Result<Vec<RecvMsg<'a>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>`
               found `Result<Vec<RecvMsg<'_>>, Box<dyn std::error::Error + Send + Sync>>`
1 Like

Implementing recvmmesg with that signature is not possible, due to the signature of nix::sys::socket::recvmmsg:

pub fn recvmmsg<'a, I>(
    fd: RawFd,
    data: impl std::iter::IntoIterator<
        Item = &'a mut RecvMmsgData<'a, I>,
        IntoIter = impl ExactSizeIterator + Iterator<Item = &'a mut RecvMmsgData<'a, I>>,
    >,
    flags: MsgFlags,
    timeout: Option<crate::sys::time::TimeSpec>,
) -> Result<Vec<RecvMsg<'a>>>
where
    I: AsRef<[IoVec<&'a mut [u8]>]> + 'a;

Notice how the Item of data has type &'a mut RecvMmsgData<'a, I>. This means that the RecvMmsgData objects must live for the entirety of 'a. However, this is incompatible with constructing them within the body of f. The "lifetime '_" that the error message is referencing is really the lifetime of the &mut msgs borrow, which is much shorter than 'a. Therefore, you'll either have to construct the RecvMmsgData objects outsize of recvmmsg, or map the RecvMsg<'a> objects into some owned type.

Understanding that the ‘_ lifetime is the &mut borrow of the local stack object helps this make sense to me.

So inside the closure, I should be able to map the result into something that has the ‘a lifetime instead of the anonymous &mut lifetime it currently returns - such as a Vec<&’a mut [u8]>. That will allow it to be returned from the function?

Thanks for your help.

Yes, that solution should work.

For others who may run into this, here is my solution - likely a lot of room for improvement.
I broke the async bit out into its own function so that I could focus on the lifetime problem. To resolve the lifetime, I mapped the RecvMsg data to a tuple that does not have the CMSG fields in it - they were causing the lifetime to be limited to the duration of the mutable msgs local stack object.

use std::{
    collections::{hash_map::Entry, HashMap},
    net::{IpAddr, SocketAddr, SocketAddrV6},
    os::unix::prelude::AsRawFd,
    sync::{Arc, Mutex},
    time::Duration,
};
use nix::sys::{
    socket::{MsgFlags, RecvMmsgData, RecvMsg},
    uio::IoVec,
};

const DatagramSize: usize = 1472;

#[derive(Error, Debug)]
enum RecvmmsgError {
    #[error("no datagrams ready to be read")]
    WouldBlock,
    #[error("unknown")]
    Unknown,
}

type CustomResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

async fn recvmmsg<'a>(
    s: &UdpSocket,
    buf: &'a mut [u8],
    capacity: usize,
) -> CustomResult<Vec<(SocketAddr, &'a mut [u8])>> {
    let chunks = buf.chunks_exact_mut(DatagramSize);

    let iovs = chunks.map(|buf| [IoVec::from_mut_slice(&mut buf[..])]);

    let mut msgs: Vec<_> = iovs
        .map(|iov| RecvMmsgData {
            iov,
            cmsg_buffer: None, // TODO: Add space for peer IP CMSG
        })
        .collect();

    let res: Vec<_> = recvmmsg_async(s, &mut msgs, nix::sys::socket::MsgFlags::empty(), None)
        .await?
        .iter()
        .map(|msg| (msg.bytes, msg.address))
        .collect();

    Ok(res
        .iter()
        .zip(buf.chunks_exact_mut(DatagramSize))
        .map(|((len, addr), chunk)| {
            let nix::sys::socket::SockAddr::Inet(addr) =
                addr.expect("socketaddr missing address in recvmmsg");

            (addr.to_std(), &mut chunk[..*len])
        })
        .collect())
}

async fn recvmmsg_async<'a, I>(
    s: &UdpSocket,
    data: impl std::iter::IntoIterator<
        Item = &'a mut RecvMmsgData<'a, I>,
        IntoIter = impl ExactSizeIterator + Iterator<Item = &'a mut RecvMmsgData<'a, I>>,
    >,
    flags: MsgFlags,
    timeout: Option<nix::sys::time::TimeSpec>,
) -> CustomResult<Vec<RecvMsg<'a>>>
where
    I: AsRef<[IoVec<&'a mut [u8]>]> + 'a,
{
    s.readable().await.map_err(|_| nix::errno::Errno::EAGAIN)?;

    let fd = s.as_raw_fd();

    let f = move || {
        match nix::sys::socket::recvmmsg(fd, data, MsgFlags::empty(), None) {
            Ok(r) => Ok(r),
            // nix gives back raw errno values - map them to what tokio expects (std::io::Error)
            Err(nix::errno::Errno::EAGAIN) => {
                Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))
            }
            Err(_) => Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                "unknown error in recvmmsg",
            )),
        }
    };

    match s.try_io(tokio::io::Interest::READABLE, f) {
        Ok(messages) => Ok(messages),
        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
            Err(Box::new(RecvmmsgError::WouldBlock))
        } //continue,
        Err(_) => Err(Box::new(RecvmmsgError::Unknown)),
    }
}
3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.