Wrapping a non-blocking socket with an async Stream

use laminar::{Socket, Config, SocketEvent};
use std::net::ToSocketAddrs;
use crate::error::ConnectError;
use std::task::Context;
use log::info;
use futures::Stream;
use async_std::pin::Pin;
use futures::task::{Poll, Waker};

/// A futures-aware UdpListener
#[allow(dead_code)]
pub struct UdpListener {
    socket: Socket,
    waker: Option<Waker>
}

impl UdpListener {
    /// Creates a new listener at a specified port
    pub fn new<T: ToSocketAddrs>(bind_addr: T) -> Result<Self, ConnectError> {
        let socket = Socket::bind_with_config(bind_addr, Self::get_config()).map_err(|err| ConnectError::from(err))?;
        Ok(Self { socket, waker: None })
    }

    /// Returns the default config
    fn get_config() -> Config {
        let mut config = Config::default();
        config.blocking_mode = false;
        config
    }

    async fn run(self: Pin<&mut Self>) -> Result<(), ConnectError> {
        Ok(())
    }
}

impl Stream for UdpListener {
    type Item = SocketEvent;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.waker {
            None => {
                self.waker = Some(cx.waker().clone());
            },

            _ => {}
        }

        match self.socket.recv() {
            Some(event) => {
                info!("SocketEvent received");

                Poll::Ready(Some(event))
            },

            _ => {
                Poll::Pending
            }
        }
    }
}

As you can see, I manually set a socket to non-blocking. The next step, I figured, was to implement Stream for the type. Thereafter, I would check the internal socket to see if an event occured. If no event occurs, then I return Poll::Pending. Else if there exists an event, I return Poll::Ready(Some(event)).

Additionally, I figured that I might want the Waker from the context ctx that way it can be waked from "somewhere else" in order to poll the UdpListener. Ideally, I would fire-up an event look via:

let my_udp_listener = UdpListener::new("0.0.0.0:1234").unwrap();
block_on(async move || my_udp_listener.run().await);

However, this wouldn't work as-is. What do I need to do in order to achieve non-blocking on a checkable socket? Ideally, once a packet gets received, I would forward it internally with a channel for further handling. Thanks

Additionally: I want to stay away from an infinite loop of checking, as well as sleeping.

This is not trivial at all. In order to take some input from tcp, there would need to be some busy-waiting loop - there is no possibility to notify when the message is ready onstd socket. Mio streams are implemented using OS specific primitives, not standard library TCP socket.

Why do you need this? If it is pure learning, then I suggest you look how it's done in MIO/Tokio. If you want this on production - don't do it. Go for tokio, or async-std - you will need some runtime anyway.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.