Futures::select and futures::channel::mpsc::Receiver::try_next()

So, long story short, I need to port the crate tokio_kcp to use async_std, so it's probably gonna be something like async_std_kcp.

My rust programming knowledge is still not very well-versed at the moment, and I come across this piece of code (click for github source):

tokio::select! {
                        // recv() then input()
                        // Drives the KCP machine forward
                        recv_result = udp_socket.recv(&mut input_buffer), if is_client => {
                            match recv_result {
                                Err(err) => {
                                    error!("[SESSION] UDP recv failed, error: {}", err);
                                }
                                Ok(n) => {
                                    let input_buffer = &input_buffer[..n];
                                    let input_conv = kcp::get_conv(input_buffer);
                                    trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
                                           n, input_conv, ByteStr::new(input_buffer));

                                    let mut socket = session.socket.lock();

                                    // Server may allocate another conv for this client.
                                    if !socket.waiting_conv() && socket.conv() != input_conv {
                                        trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
                                        socket.set_conv(input_conv);
                                    }

                                    match socket.input(input_buffer) {
                                        Ok(true) => {
                                            trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
                                        }
                                        Ok(false) => {}
                                        Err(err) => {
                                            error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}",
                                                   n, err, ByteStr::new(input_buffer));
                                        }
                                    }
                                }
                            }
                        }

and I got it into

futures::select! {
                        // recv() then input()
                        // Drives the KCP machine forward

                        recv_result = udp_socket.recv(&mut input_buffer) => {
                            if is_client {
                                match recv_result {
                                    Err(err) => {
                                        error!("[SESSION] UDP recv failed, error: {}", err);
                                    }
                                    Ok(n) => {
                                        let input_buffer = &input_buffer[..n];
                                        let input_conv = kcp::get_conv(input_buffer);
                                        trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
                                               n, input_conv, ByteStr::new(input_buffer));

                                        let mut socket = session.socket.lock();

                                        // Server may allocate another conv for this client.
                                        if !socket.waiting_conv() && socket.conv() != input_conv {
                                            trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
                                            socket.set_conv(input_conv);
                                        }

                                        match socket.input(input_buffer) {
                                            Ok(true) => {
                                                trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
                                            }
                                            Ok(false) => {}
                                            Err(err) => {
                                                error!("[SESSION] UDP input {} bytes error: {:?}, input buffer {:?}",
                                                       n, err, ByteStr::new(input_buffer));
                                            }
                                        }
                                    }
                                }
                            }
                        }

                        // bytes received from listener socket

                        input_opt = input_rx.recv() => {
                            if let Some(input_buffer) = input_opt {
                                let mut socket = session.socket.lock();
                                match socket.input(&input_buffer) {
                                    Ok(waked) => {
                                        // trace!("[SESSION] UDP input {} bytes from channel {:?}",
                                        //        input_buffer.len(), ByteStr::new(&input_buffer));
                                        trace!("[SESSION] UDP input {} bytes from channel, waked? {} sender/receiver",
                                               input_buffer.len(), waked);
                                    }
                                    Err(err) => {
                                        error!("[SESSION] UDP input {} bytes from channel failed, error: {:?}, input buffer {:?}",
                                               input_buffer.len(), err, ByteStr::new(&input_buffer));
                                    }
                                }
                            }
                        }

The problem is input_rx is now of type futures::channel::mpsc::Receiver instead of tokio::sync::mpsc::Receiver, and the former does NOT have the function recv. To read the data, my only choice is to use try_next. However, it would not work with future::select, because try_next is not async.
So the way I see it, I have 3 options.

  • Impliment recv() for futures::channel::mpsc::Receiver from a custom Trait.
  • Instead of using select, do a loop that constantly check for the pooling status of async_std::net::UdpSocket::recv, which honestly I do not know how.
  • Still use a loop, but impliment some kind of try_recv in a custom Trait for async_std::net::UdpSocket, which is probably the most difficult option.

Am I missing something, or am I overcomplicated thing a lot and there is an easier solution?
Thanks in advance.

The futures channel does have a directly equivalent to recv, but you need to use the Stream trait to access it.

use futures::stream::StreamExt;

input_opt = input_rx.next() => { ...
1 Like

Thank you for the solution. It worked really well. However, can I bother you a bit more.
Currently, tokio::net::UdpSocket has the function try_send_to, which is not async.
However, in this line, to impl Write for UdpOutput, the write function needs to be not async, yet, async_std::net::UdpSocket has only send and send_to functions, both of which are async.

So what are my options here?
Thank you.

You could make two sockets - one using std and one with async_std. By setting the std one in non-blocking mode, you can use it to attempt sends, and then the background task can use the other socket for the aaync sends.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.