Get wakeup to multiple futures for same underlying socket

Copied from SO

I have following code which sends data to multiple UDP endpoints using same local UdpSocket:

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::{
    future::Future,
    net::{Ipv4Addr, SocketAddr},
    pin::Pin,
    task::{Context, Poll},
};
use tokio::net::UdpSocket;

#[tokio::main]
async fn main() {
    let server_0: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12000).into();
    let server_2: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12002).into();
    let server_1: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12001).into();

    tokio::spawn(start_server(server_0));
    tokio::spawn(start_server(server_1));
    tokio::spawn(start_server(server_2));

    let client_addr: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12004).into();
    let socket = UdpSocket::bind(client_addr).await.unwrap();

    let mut futs = FuturesUnordered::new();
    futs.push(Task::new(0, &socket, &server_0));
    futs.push(Task::new(1, &socket, &server_1));
    futs.push(Task::new(2, &socket, &server_2));

    while let Some(n) = futs.next().await {
        println!("Done: {:?}", n)
    }
}

async fn start_server(addr: SocketAddr) {
    let mut socket = UdpSocket::bind(addr).await.unwrap();
    let mut buf = [0; 512];
    loop {
        println!("{:?}", socket.recv_from(&mut buf).await);
    }
}

struct Task<'a> {
    value: u32,
    socket: &'a UdpSocket,
    addr: &'a SocketAddr,
}

impl<'a> Task<'a> {
    fn new(value: u32, socket: &'a UdpSocket, addr: &'a SocketAddr) -> Self {
        Self {
            value,
            socket,
            addr,
        }
    }
}

impl Future for Task<'_> {
    type Output = Option<u32>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Polling for {}", self.value);
        let buf = &self.value.to_be_bytes();

        match self.socket.poll_send_to(cx, buf, self.addr) {
            Poll::Ready(Ok(_)) => {
                println!("Got Ok for {}", self.value);
                Poll::Ready(Some(self.value))
            }
            Poll::Ready(Err(_)) => {
                println!("Got err for {}", self.value);
                Poll::Ready(None)
            }
            Poll::Pending => {
                println!("Got pending for {}", self.value);
                Poll::Pending
            }
        }
    }
}

The problem is that sometimes it gets stuck after writing only one of the data printing:

Polling for 0
Got pending for 0
Polling for 1
Got pending for 1
Polling for 2
Got pending for 2
Polling for 2
Got Ok for 2
Done: Some(2)
Ok((4, V4(127.0.0.1:12004)))

The tasks with value 0 and 1 are never woken up in this case. How do I reliably signal them to wake them up?

I tried calling cx.waker().wake_by_ref() on receiving Poll::Ready as I thought that may wake up other too, but that's not the case.

There is a user claiming that the below usage of FuturesUnordered is wrong:

    while let Some(n) = futs.next().await {
        println!("Done: {:?}", n)
    }

Is this wrong? If yes, why please?

When poll_send_to returns Poll::Pending, it guarantees to emit a wake-up to the waker provided in the context is was polled with, however it is only required to emit a wake-up to the last waker it was polled with. This means that since you are calling poll_send_to on the same socket from multiple tasks, the socket has only promised to emit a wake-up to the one that polled it last.

This also explains why this works:

let mut futs = Vec::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));

for n in join_all(futs).await {
    println!("Done: {:?}", n)
}

Unlike FuturesUnordered, the join_all combinator will poll every internal future every time it is polled, but FuturesUnordered keeps track of which underlying future the wake-up came from.

There's nothing wrong with the way you're using the FuturesUnordered with that loop. The issue is that the futures inside are not emitting wake-ups.

1 Like

Thank you for the response. I suspected the same.
Is it possible to be able to use FuturesUnordered as there can be many futures that would want to run here and I want to be able to call poll_next (in the actual code) to get whichever is actually ready and also be able to add more futures while polling?

That's more or less what FuturesUnordered does? As long as the tasks notify correctly, you can definitely add more futures incrementally.

so in this case, what can I do to notify the other tasks?

You should probably create a separate socket for each task so that notifications are not cancelled.

I wanted to avoid that and use the same socket for all of them. Is there a way to do join_all like polling but also be able to add more futures to it? Perhaps by implementing a custom Stream?

Alternatively you can poll the socket in one place, and have that place inspect the address and hand it out to the correct receiver.

1 Like

So one future which will handle all read/writes and all other futures talking to it through async channels? Because I currently have that design only and it works. I thought if didnt have to spawn that first future, it'd be cleaner.

There are many ways to transfer the message to the futures, but channels are indeed a good one. You can't poll the socket from several tasks, sorry.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.