How to have one thread wait for diverse events?

I need to have a single thread wait for several different kinds of events:

  • a sync_channel from other threads
  • a socket fd
  • signals from other processes or ctrl-C

Coming from C, I'd use libc::epoll_pwait - but that means doing all of the necessary wrapping myself with unsafe blocks, etc.. I imagine that this hetero-event wait pattern is common enough that it's already been implemented in some generic way in Rust - but where? Does anyone know of an example?

Oh - maybe found the answer myself: calloop

I'd just use a queue with an enum representing the different kinds of events.

The producer threads send over their events wrapped in the matching enum variant.

The consumer thread just matches on that.

1 Like

Only the sync_channel events come from other threads. The rest come in from the OS. I need one event loop to rule them all.

This is exactly what async Rust is. The fundamental async architecture which separates executing tasks (Future::poll()) from waking up due to events (Context::waker() → Waker::wake()) provides the ability to wake a task upon any number of distinct events, without any required centralization of what kinds of events or what kinds of tasks exist.

I need one event loop to rule them all .

That's what an async executor is.

For clarification, I believe @erelde is suggesting the approach where, for each distinct type of event, you create an additional thread which waits only on that event and sends on a MPSC channel, and the events are processed by a single main-loop thread which receives from the MPSC channel.

This approach works, but async has similar outcomes without needing to create as many threads or merge all your event types into one channel with an enum type. (Additional threads are still necessary for any operations which are done using blocking system calls, or blocking foreign functions.)

1 Like

I may be heading towards async. I have skimmed the tokio doc. It looks like it would require a much more extensive rewrite of the C app I'm porting to Rust than I anticipated. But it also looks like it offers many advantages. At this point, though, I am so close to a working Rust app that I don't think I want to back up and go full async instead. Maybe as my next project.

Tokio may be too huge for you. The simplest implementation of executor is calling thread::park after receiving Poll::Pending and thread::unpark in waker, with AtomicUsize to have state to handle a couple of situations.

You can look at pollster - Rust or block_on in futures::executor - Rust for this.

1 Like

Would it be possible for someone to be kind enough to implement a small example?

I am also interested in event-driven concepts, it should be that the event_listener waits for incoming notifications and blocks the task until an event-notification arrives? But how do you block asynchronous code, so is that allowed in Rust?

The way “leaf futures” work — that is, futures that actually do some IO or something that needs to wait as a primitive action, rather than simply computation and calling other futures — is that when it is time to wait, they take the Waker from the context and store it somewhere, such as in an event callback of some description, then return Poll::Pending from their poll() method. Returning Pending is what “blocks the task” — nothing more, nothing less. Then, when the event actually happens, the Waker that was stored must be invoked to wake up, “unblock”, the task.

Exactly how the Waker is stored depends entirely on the specific kind of event being waited for. Common examples:

  • For file descriptors, there will usually be some kind of central data structure (the “reactor”) that has a list of file descriptors and Wakers, and uses epoll or similar to block the reactor’s thread(s) until something happens on at least one file descriptor, and then the appropriate Wakers are invoked.

    This is the primary job of an IO-focused async runtime like tokio. It is also possible to build a reactor that is completely separate from the async executor that polls tasks, because the reactor’s job is to invoke Wakers; it doesn't need to own or know about the tasks which those wakers wake (though that can lead to performance optimizations).

  • For receiving a message on a channel, the Waker will be stored in the channel itself, such that the sending side can invoke the Waker right after it has put an item in the channel.

3 Likes

Thank you very much, is this a reactor pattern? I never heard this before, but I also ask gpt and let them do a code example, so I can better imagine.

Is this the correct strategy way:

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::future::Future;
use std::collections::HashMap;

struct Reactor {
    wakers: Mutex<HashMap<i32, Waker>>,
}

impl Reactor {
    fn new() -> Self {
        Reactor {
            wakers: Mutex::new(HashMap::new()),
        }
    }

    fn register(&self, fd: i32, waker: Waker) {
        let mut wakers = self.wakers.lock().unwrap();
        wakers.insert(fd, waker);
    }

    fn wake(&self, fd: i32) {
        let mut wakers = self.wakers.lock().unwrap();
        if let Some(waker) = wakers.remove(&fd) {
            waker.wake();
        }
    }
}

struct LeafFuture {
    reactor: Arc<Reactor>,
    fd: i32,
    polled: bool,
}

impl Future for LeafFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.polled {
            Poll::Ready(())
        } else {
            self.reactor.register(self.fd, cx.waker().clone());
            self.polled = true;
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let reactor = Arc::new(Reactor::new());

    // Example with file descriptor (simulated with a TcpListener)
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let reactor_clone = reactor.clone();
    tokio::spawn(async move {
        loop {
            let (socket, _) = listener.accept().await.unwrap();
            reactor_clone.wake(socket.as_raw_fd());
        }
    });

    // Example with message channel
    let (tx, mut rx) = mpsc::channel(32);
    let reactor_clone = reactor.clone();
    tokio::spawn(async move {
        while let Some(_) = rx.recv().await {
            reactor_clone.wake(1); // Simulated file descriptor for message channel
        }
    });

    // Wait for events
    let leaf_future = LeafFuture {
        reactor: reactor.clone(),
        fd: 1,
        polled: false,
    };
    leaf_future.await;

    println!("Event received!");
}
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.polled {
            Poll::Ready(())

This part is wrong, because it returns Ready when polled a second time, whether or not the intended event has actually occurred. A future must always tolerate being polled extra times even if it hasn't used its waker yet.

impl Reactor {
    ...
    fn wake(&self, fd: i32) {

This method should not exist, because the whole point of the reactor is that its own loop waits for events (using epoll or similar) and dispatches them to wakers. The reactor does not take external requests to wake tasks — any time you could do that, using the task's Waker directly is the better strategy.

1 Like

The waker is the event handler?
The waker I don't have understood, probably.

Is this implementation of reactor correct?

impl Reactor {
    fn new() -> Self {
        Reactor {
            poll: Poll::new().unwrap(),
            events: Events::with_capacity(128),
            wakers: Mutex::new(HashMap::new()),
        }
    }

    fn register(&self, fd: i32, waker: Waker) {
        let mut wakers = self.wakers.lock().unwrap();
        wakers.insert(fd, waker);
    }

    fn run(&self) {
        loop {
            self.poll.poll(&mut self.events, None).unwrap();

            for event in &self.events {
                let token = event.token().0 as i32;
                let mut wakers = self.wakers.lock().unwrap();
                if let Some(waker) = wakers.remove(&token) {
                    waker.wake();
                }
            }
        }
    }

    fn add_fd(&self, fd: i32) {
        self.poll.registry().register(
            &mut mio::unix::SourceFd(&fd),
            Token(fd as usize),
            Interest::READABLE,
        ).unwrap();
    }
}

The waker is the event handler?

In the typical meaning of “event handler”, when an event happens, an event handler is called by whatever the source of events is, and passed the event. Rust async splits that job into two separate roles. The Waker is called by the event source to notify the task that something should be done. The actual data of the event is not communicated through the Waker, but by the leaf future, separately, consulting the source. Wakeups are “pushed” and data is “pulled”.

Is this implementation of reactor correct?

It does look like it has the right general idea (and some bugs).

I won't comment further, and I certainly can't say anything about correctness, because it has far too many undefined names that aren't part of std (struct Reactor, Poll, poll(), Events, token(), registry(), register(), mio, Token, Interest), and I get the impression that you are probably asking ChatGPT for code and then asking me about it. I am not interested in participating in that process. If you want to make use of ChatGPT, it is your job to take whatever it invents and make it into something useful to you (whether that is by fixing bugs, discarding parts and keeping others, or by reading the documentation for the things the code mentions).

2 Likes

I am sorry and I can understand this.

Thank you for your explanations, I only want to understand better, with some code example, your explanation was good, but I am not very experience in Rust and the event-driven conception, so I am doing hard. Please forgive.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.