Futures::select and futures::channel::mpsc::Receiver::try_next()

So, long story short, I need to port the crate tokio_kcp to use async_std, so it's probably gonna be something like async_std_kcp.

My rust programming knowledge is still not very well-versed at the moment, and I come across this piece of code (click for github source):

tokio::select! {
                        // recv() then input()
                        // Drives the KCP machine forward
                        recv_result = udp_socket.recv(&mut input_buffer), if is_client => {
                            match recv_result {
                                Err(err) => {
                                    error!("[SESSION] UDP recv failed, error: {}", err);
                                }
                                Ok(n) => {
                                    let input_buffer = &input_buffer[..n];
                                    let input_conv = kcp::get_conv(input_buffer);
                                    trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
                                           n, input_conv, ByteStr::new(input_buffer));

                                    let mut socket = session.socket.lock();

                                    // Server may allocate another conv for this client.
                                    if !socket.waiting_conv() && socket.conv() != input_conv {
                                        trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
                                        socket.set_conv(input_conv);
                                    }

                                    match socket.input(input_buffer) {
                                        Ok(true) => {
                                            trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
                                        }
                                        Ok(false) => {}
                                        Err(err) => {
                                            error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}",
                                                   n, err, ByteStr::new(input_buffer));
                                        }
                                    }
                                }
                            }
                        }

and I got it into

futures::select! {
                        // recv() then input()
                        // Drives the KCP machine forward

                        recv_result = udp_socket.recv(&mut input_buffer) => {
                            if is_client {
                                match recv_result {
                                    Err(err) => {
                                        error!("[SESSION] UDP recv failed, error: {}", err);
                                    }
                                    Ok(n) => {
                                        let input_buffer = &input_buffer[..n];
                                        let input_conv = kcp::get_conv(input_buffer);
                                        trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
                                               n, input_conv, ByteStr::new(input_buffer));

                                        let mut socket = session.socket.lock();

                                        // Server may allocate another conv for this client.
                                        if !socket.waiting_conv() && socket.conv() != input_conv {
                                            trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
                                            socket.set_conv(input_conv);
                                        }

                                        match socket.input(input_buffer) {
                                            Ok(true) => {
                                                trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
                                            }
                                            Ok(false) => {}
                                            Err(err) => {
                                                error!("[SESSION] UDP input {} bytes error: {:?}, input buffer {:?}",
                                                       n, err, ByteStr::new(input_buffer));
                                            }
                                        }
                                    }
                                }
                            }
                        }

                        // bytes received from listener socket

                        input_opt = input_rx.recv() => {
                            if let Some(input_buffer) = input_opt {
                                let mut socket = session.socket.lock();
                                match socket.input(&input_buffer) {
                                    Ok(waked) => {
                                        // trace!("[SESSION] UDP input {} bytes from channel {:?}",
                                        //        input_buffer.len(), ByteStr::new(&input_buffer));
                                        trace!("[SESSION] UDP input {} bytes from channel, waked? {} sender/receiver",
                                               input_buffer.len(), waked);
                                    }
                                    Err(err) => {
                                        error!("[SESSION] UDP input {} bytes from channel failed, error: {:?}, input buffer {:?}",
                                               input_buffer.len(), err, ByteStr::new(&input_buffer));
                                    }
                                }
                            }
                        }

The problem is input_rx is now of type futures::channel::mpsc::Receiver instead of tokio::sync::mpsc::Receiver, and the former does NOT have the function recv. To read the data, my only choice is to use try_next. However, it would not work with future::select, because try_next is not async.
So the way I see it, I have 3 options.

  • Impliment recv() for futures::channel::mpsc::Receiver from a custom Trait.
  • Instead of using select, do a loop that constantly check for the pooling status of async_std::net::UdpSocket::recv, which honestly I do not know how.
  • Still use a loop, but impliment some kind of try_recv in a custom Trait for async_std::net::UdpSocket, which is probably the most difficult option.

Am I missing something, or am I overcomplicated thing a lot and there is an easier solution?
Thanks in advance.

The futures channel does have a directly equivalent to recv, but you need to use the Stream trait to access it.

use futures::stream::StreamExt;

input_opt = input_rx.next() => { ...
1 Like

Thank you for the solution. It worked really well. However, can I bother you a bit more.
Currently, tokio::net::UdpSocket has the function try_send_to, which is not async.
However, in this line, to impl Write for UdpOutput, the write function needs to be not async, yet, async_std::net::UdpSocket has only send and send_to functions, both of which are async.

So what are my options here?
Thank you.

You could make two sockets - one using std and one with async_std. By setting the std one in non-blocking mode, you can use it to attempt sends, and then the background task can use the other socket for the aaync sends.