Need help on receiving packet with a series of udp sockets asynchronously

Hi,
I'm new to async rust, and I'm trying to use the async method to receive data with UDP sockets. I need some suggestions.

The main thread maintains N tokio::net::UdpSocket, each of which is bonded to a certain combination of (IP, port).
N cannot be fixed at the time of compilation. The value of N can be 100 at most.

The main thread is an infinite loop. In each round of the loop, when any one of the UDP sockets receives a packet, this packet is sent to another working thread to be processed through a crossbeam mpsc channel.

The minimum seems working code is as follows:

use tokio::net::UdpSocket;
use tokio::select;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let sock1 = UdpSocket::bind("127.0.0.1:8080").await?;
    let sock2 = UdpSocket::bind("127.0.0.1:7070").await?;
    let mut buf1=vec![0;1024];
    let mut buf2=vec![0;1024];
    loop {
        let (len, addr, socket_id)=select!{
            Ok((len, addr))=sock1.recv_from(&mut buf1)=>{(len, addr, 0)}
            Ok((len, addr))=sock2.recv_from(&mut buf2)=>{(len, addr, 1)}
        };
        println!("{} {} {}", len, addr, socket_id);
        //buf1/2 are sent through sender[socket_id]
    }
}

My question is

  1. how can I make the number of sockets dynamically determined (by using cmd line arguments)
  2. is it possible that there is only one buf needed, rather than buf1 and buf2...?

you cannot use the select! macro for that. an easy solution would be using the futures::select_all() combinator; alternatively, you can turn the futures into a Stream, and use the FuturesUnordered.

but these combinators might not scale well for performance, I would suggest you spawn a task for each socket, because the task scheduler is generally more efficient in polling the futures than the combinators when there are many tasks.

it is possible, you poll the UdpSocket::readable() future instead of UdpSocket::recv_from(), and then call UdpSocket::try_recv_from() only on those readable sockets. note readable() might report false positives, so you should check the return value of try_recv_from().

So should I use tokio::task::spawn_blocking, which contains a loop{} block, and in the loop block, the UdpSocket::recv_from is run repeatedly?
Or simply using ordinary thread is OK?

just use tokio::spawn, no need to use native threads. a tokio task has little overhead compared to a OS thread, since it's just a future (actually a wrapper with some bookkeeping states).

for example:

tokio::spawn(async move {
    let mut buf = vec![0; 1024];
    loop {
        match sock.recv_from(&mut buf).await {
            Ok((nbytes, remote_addr)) => {
                // process the packet, e.g. send it through a channel
            }
            Err(e) => {
                // error handling
            }
        }
    }
})
1 Like