How do i make following code async

let res = (0..repeat).filter_map(|_| Self::send_and_recv(socket, pdu, out).ok() ).next() ;

now that send_and_recv is async

tried this

let res = stream::iter(0..repeat).filter_map(|x| async move {
    Self::send_and_recv(socket, pdu,out).await.ok()}    
).boxed().next() ;

cannot move out of socket, a captured variable in an FnMut closure

What is the signature of Self::send_and_recv? Is it necessary for it to take ownership of socket?

Have you tried with just an async block instead of an async move block? It seems that the block does not need to take ownership of socket.

As a side note the .boxed() there isn't necessary, you can call .next() on the stream directly. And for the synchronous version .filter_map().next() is better expressed by .find_map, but unfortunately that method is not present in StreamExt :frowning:.

async fn send_and_recv(socket: &mut UdpSocket, pdu: &pdu::Buf, out: &mut [u8]) -> SnmpResult<usize>

without boxed() it gives unpinned error.

std::future::from_generator::GenFuture<[static generator@src/tokio_session.rs:61:64: 62:64]> cannot be unpinned

within futures::stream::stream::filter_map::_::__Origin<'_, futures::stream::Iter<std::ops::Range<u32>>, impl futures::Future, [closure@src/tokio_session.rs:61:54: 62:64]>, the trait std::marker::Unpin is not implemented for std::future::from_generator::GenFuture<[static generator@src/tokio_session.rs:61:64: 62:64]>

note: required because it appears within the type impl futures::Future
note: required because it appears within the type `std::option::Option<impl

Oh right. Then you'll have to do this:

let stream = /* create the stream */;
futures::pin_mut!(stream);
let res = stream.next().await;

pin_mut! is a zero-cost operation, in comparison with .boxed() which requires an allocation.

thx a lot. that helps.
let mystream = stream::iter(0..repeat) ;
let res = pin_mut!(mystream).filter_map(|_| async move {
Self::send_and_recv(socket, pdu,out).await.ok()
}
).next().await;

now it is giving this error,
no method named filter_map found for type bool in the current scope

method not found in bool

note: the method filter_map exists but the following trait bounds were not satisfied:
bool: futures::Stream
which is required by bool: futures::StreamExt
&bool: futures::Stream
which is required by &bool: futures::StreamExt
&mut bool: futures::Stream
which is required by &mut bool: futures::StreamExt
bool: std::iter::Iterator
which is required by &mut bool: std::iter::Iteratorrustc(E0599)

pin_mut! is not an expression unlike most things in Rust - it expands to a statement, so it must be put on a separate line like I wrote in my example. And it needs to be applied after the filter_map, becuase it's the filter_map that produces a !Unpin type which pin_mut! can remedy.

Thank you.
Back to the cannot move put of socket, a captured variable ..
async fn send_and_recv_repeat(socket: &mut UdpSocket, pdu: &pdu::Buf, out: &mut [u8], repeat:u32) -> SnmpResult {
let mystream = stream::iter(0..repeat) ;
let stream = mystream.filter_map(|_| async move {
Self::send_and_recv(socket, pdu,out).await.ok()

    });

    pin_mut!(stream) ;
   let res = stream.next().await ;
    match res {
       Some(len) => Ok(len),
       None => Err(SnmpError::ReceiveError)
    }
    
}

cannot move out of socket, a captured variable in an FnMut closure

move out of socket occurs hererustc(E0507)

tokio_session.rs(61, 35): captured outer variable

tokio_session.rs(63, 57): move out of socket occurs here

tokio_session.rs(64, 40): move occurs because socket has type &mut tokio::net::UdpSocket, which does not implement the Copy trait

tokio_session.rs(64, 40): move occurs due to use in generator
cannot move out of out, a captured variable in an FnMut closure

move out of out occurs hererustc(E0507)

tokio_session.rs(61, 75): captured outer variable

tokio_session.rs(63, 57): move out of out occurs here

tokio_session.rs(64, 52): move occurs because out has type &mut [u8], which does not implement the Copy trait

tokio_session.rs(64, 52): move occurs due to use in generator

cannot move out of out, a captured variable in an FnMut closure

Have you tried my earlier suggestion of using async instead of async move?

Also, when you paste code in please surround it in ``` - it makes it easier to read. For example:

```
fn main() {}
```

becomes:

fn main() {}

no change with the removal of the move
here is the whole code block

use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::net::{UdpSocket,ToSocketAddrs} ;
use tokio::io;
use futures::pin_mut ;
use futures::stream::{self,StreamExt};
use std::io::{Error, ErrorKind};
use std::{mem, ops } ;
const BUFFER_SIZE: usize = 4096;

/// Asynchronous SNMP client for Tokio , so that it can work with actix 
pub struct TokioSession {
    socket: UdpSocket,
    community: Vec<u8>,
    req_id: Wrapping<i32>,
    send_pdu: Buf,
    recv_buf: [u8; BUFFER_SIZE],
    version:i32
}
pub struct Buf {
    len: usize,
    buf: [u8; BUFFER_SIZE],
}



impl Default for Buf {
    fn default() -> Buf {
        Buf {
            len: 0,
            buf: unsafe { mem::uninitialized() },
        }
    }
}

impl ops::Deref for Buf {
    type Target = [u8];
    fn deref(&self) -> &[u8] {
        &self.buf[BUFFER_SIZE - self.len..]
    }
}



impl TokioSession {
    pub async fn new<SA>(destination: SA, community: &[u8], starting_req_id: i32, version:i32) -> io::Result<Self>
        where SA: ToSocketAddrs
    {
        let socket = match destination.to_socket_addrs().await?.next() {
            Some(SocketAddr::V4(_)) => UdpSocket::bind((Ipv4Addr::new(0,0,0,0), 0)).await?,
            Some(SocketAddr::V6(_)) => UdpSocket::bind((Ipv6Addr::new(0,0,0,0,0,0,0,0), 0)).await?,
            None => panic!("empty list of socket addrs"),
        };

        socket.connect(destination).await?;
        Ok(Self {
            socket: socket,
            community: community.to_vec(),
            req_id: Wrapping(starting_req_id),
            send_pdu: Buf::default(),
            recv_buf: [0; 4096],
            version
        })
    }

    async fn send_and_recv(socket: &mut UdpSocket, pdu: &Buf, out: &mut [u8]) -> io::Result<usize> {
        if let Ok(_pdu_len) = socket.send(&pdu[..]).await {
            match socket.recv(out).await {
                Ok(len) => Ok(len),
                Err(_) => Err(Error::new(ErrorKind::Other, "Snmp Rx Error"))
            }
        } else {
            Err(Error::new(ErrorKind::Other, "Snmp Tx Error"))
        }
    }
    
    async fn send_and_recv_repeat(socket: &mut UdpSocket, pdu: &Buf, out: &mut [u8], repeat:u32) -> io::Result<usize> {
        let mystream = stream::iter(0..repeat) ;
        let stream = mystream.filter_map(|_| async {
                Self::send_and_recv(socket, pdu,out).await.ok()
                 
        });
        pin_mut!(stream) ;
       let res = stream.next().await ;
        match res {
           Some(len) => Ok(len),
           None => Err(Error::new(ErrorKind::Other, "Error in Repeat loop"))
        }
        
        
    }
    

Ah, I think I see the problem now. It's because the signature of filter_map requires that the closure can be called before the future of the previous closure has finished executing. So the closure is not able to use the mutable reference to socket, because the previous closure could still be using it in its future. Of course, filter_map will never actually do this, but it is allowed to in its signature - this is just an unfortunate limitation of today's Rust.

As a workaround you can use a simple imperative loop:

for _ in 0..repeat {
    if let Ok(len) = Self::send_and_recv(socket, pdu, out).await {
        return Ok(len);
    }
}
Err(Error::new(ErrorKind::Other, "Error in Repeat loop")))
1 Like

Thank you, :pray:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.