Validate that a tokio Sender and Receiver belong to the same channel

I have an API where I need a user to pass in the Receiver of a tokio::sync::mpsc::channel into the factory method of a struct.
Within the struct's private functions I also need the ability to send to that very receiver, so I also need a copy of the appropriate Sender.

I do not want to create the channel within the factory method, because that would make the API weird, since I'd then need to return (Self, Sender<T>) from it and also pass in the desired channel size, which I don't care about in the context of that struct.

However, just accepting a Sender<T> and a Receiver<T> does not guarantee them belonging to the same channel.

Sender has a same_channel() method, but it only accepts another Sender. I cannot use it to check whether it is on the same channel as a Receiver.

So my question is whether I can validate that somehow? I'm fine returning an appropriate Result from the factory method in question.

The only other option I can think of is to mark the factory function unsafe and require the user to uphold the invariant that Sender and Receiver are on the same channel.

PS: Another option, which is only half-bad is to require the user to curry the channel function so that I don't need to pass in the channel size:

impl Gestures {
    /// Create a new gesture handler.
    #[must_use]
    pub fn new<F>(
        f: F,
        gestures: Sender<Gesture>,
        double_click_timeout: Duration,
    ) -> (Self, Sender<Message>)
    where
        F: FnOnce() -> (Sender<Message>, Receiver<Message>),
    {
        let (messages_out, messages_in) = f();
        (
            Self {
                messages_out: messages_out.clone(),
                messages_in,
                gestures,
                cache: Mutex::new(HashSet::new()),
                double_click_timeout,
            },
            messages_out,
        )
    }
}

...

let (gestures_demux, messages_out) = Gestures::new(|| channel(42), ...);

But this still does not guarantee, that the Sender and Receiver returned from the function belong to the same channel.

Based on Sender's same_channel method implementation, I don't see why we couldn't have the same for Receiver:

pub fn same_channel(&self, other: &Self) -> bool {
        self.chan.same_channel(&other.chan)
    }

Perhaps you could create a PR and suggest implementing this on Receiver as well :slight_smile: .

Edit: I see. This would require having a way to access the chan on Sender, which would require either making it public or having a getter method.

That aside, I don't see why you want to know about the Receiver on a struct that only cares about the Sender.

Could you perhaps add further context for the use case?

FWIW, I believe returning (Self, Sender) isn't too unusual.

You could instead have a sender method that returns a new one, or even wrap it up completely, as a "handle" or "proxy" type that has explicit methods for the messages.

Otherwise, if you do want to accept a sender / receiver pair, it's not actually going to actually break anything in an Undefined Behavior sense, just misbehave, if the caller does give you a mismatched pair, surely? That doesn't seem to meet the bar for needing unsafe, it's more like a hash table given a misbehaving hash implementation: it might leak memory or give invalid results, but it won't corrupt memory.

1 Like

Unfortunately the suggested wrapping will not work, because the respective channel is the communication channel. The struct itself acts as an actor which communicates via those channels. And it can also send messages to itself, hence the whole kerfuffle.

In my case, it will deadlock the communication, so...

It's an actor aggregating raw events encoded in messages to decide whether a double click was performed on certain event types:

                                   _______
Event -> Message::Event(Event) -> | Actor | -> Gesture
                                   -------
                                   ^     |
                                   |     |
                           Message::Timeout(Event)

I'm not sure I understand the problem with wrapping, the API for actors normally looks something like this if you really boil it down (outside the context of actor languages which hide all this):

impl Runtime {
    fn spawn_actor<Message, Handler>(
        &self,
        handler: Handler, 
    ) -> Sender<Message>
        where Message: Send, 
              Handler: FnMut(Message) + Send; 
}

#[derive(Clone)]
struct Foo(Sender<FooMessage>);

impl Foo {
    pub fn spawn(rt: &Runtime, bar: Bar) -> Self {
        // create internal state ...
        Self(rt.spawn_actor(move |message| {
            match message {
                // update state, send outgoing messages ...
            }
        })
    }

    pub fn baz(&self, qux: Qux) {
        self.0.send(FooMessage::Baz(qux))
    }

    // etc
}

eg the caller never sees the "actual actor" containing the state, only a handle to it which just wraps up sending messages.

It doesn't sound like that's what you're doing here, so it's not clear to me what you mean by actors, or why:

makes wrapping the Sender a problem. As far as I can tell, you would only need to change the above to have spawn_actor take the Receiver instead of returning it, then have each spawn function create the channel and the Self themselves before calling it?

I would instead prefer just passing a "context" parameter to the Handler which let's it directly send a Message to itself, kill itself, etc.

Yes, and deadlocks are a perfectly safe misbehaviour. Unless you have some actually unsafe code that's going to assume the property you're requiring of the caller, you shouldn't require unsafe code to call it.

There's a distinction between "library unsafe" and "language unsafe" to be sure, but that line is generally between when, exactly, you are invoking Undefined Behavior and inviting the nasal demons into your process: for language unsafe it's the moment you even build the code that fails to meet the requirements, without it ever being called, while library unsafe is still sound if you didn't meet the requirements of one function but never called the other function that depended on the requirements.

Thanks for the input. Maybe my approach to the actor model is a bit different than "usual".
I have structs that represent actors and encapsulate their respective state.
They usually contain one Receiver to handle incoming messages from other actors and have one (or more) Sender(s) to send messages to other actors. I.e. the actors are connected via mpsc channels.

So my usual pattern is:

struct Actor {
    incoming: Receiver<MsgIn>,
    outgoing: Sender<MsgOut>,
    ...
}

impl Actor {
    #[must_use]
    pub const fn new(
        incoming: Receiver<MsgIn>,
        outgoing: Sender<MsgOut>,
        ...
    ) -> Self {
        Self {
            incoming,
            outgoing,
            ...
        }
    }

    pub async fn run(mut self) {
        while let Some(message) = self.incoming.recv().await {
            let response = ...;
            self.outgoing.send(response).await;  // handle send errors
        }
    }
}

This way the different actors are completely decoupled in the sense, that they don't know anything about each other except for the type of message that they exchange.
Also I don't need a global runtime in the sense mentioned above (short of the tokio runtime of course, but that's different) or any kind of global state I need to pass around.

let (tx1, rx1) = channel(42); // tx1 = actor1 input
let (tx2, rx2) = channel(42); // communication between actor1 and actor2
let (tx3, rx3) = channel(42); // rx3 = actor2 output

let actor1 = spawn(Actor1::new(rx1, tx2).run());
let actor2 = spawn(Actor2::new(rx2, tx3).run());

tx1.send(stuff).await;
let resp = rx3.recv().await;

In my current case in question, I, however, also need another Sender to the incoming: Receiver above within the actor, since it must have the ability to send itself "incoming" messages in this particular case. So I somehow must ensure that this sender is on the same channel as the Receiver that was thrown into the constructor.

Tokio is actually very much the equivalent of the SomeRuntime in my API sketch, specifically spawn is a call to the implicit thread-local Runtime, so you're pretty close with some shuffling.

In particular, if you factored out the loop in the run methods then the only real difference is if the state is explicit or the implicit captures of a handler lambda. I prefer the succinctness of capturing, but there's other trade-offs.

Do you have any use for the actor1, etc task handles? Actors are traditionally "fire and forget", so I'm not sure if there's much use for those, other than waiting for everything spawned to exit. That can be done implicitly by dropping a Runtime, or explicitly by spawning into a JoinSet, for example.

1 Like

In my current setting, most of the actors are static, i.e. they get spawned and run for the entirety of the program. Other actors are dynamic - though not the one in question - so they can spawn and despawn during the programs runtime at any time.
So in the above example case, no, I don't really need the handle.
In the dynamic cases, which I will deal with later, there will be a static actor, which will dynamically spawn and despawn other actors internally of which I'll need the handles in order to abort them on demand.

I might be getting a bit far from your current design, but I played around with seeing if I could make a relatively ergonomic yet thin actor model using explicit state and self-sending and ended up with:

Note in particular that demonstrates an interesting race that self-sending raises as a hazard:

button.set_text("Hello, world").await;
button.click().await; // should set text to "Count: 6"
let text = button.read_text().await; // still "Hello, world"!

The click() handler does queue a message to set the text, but only after the read_text() message was queued. This is an inherent issue with actor frameworks, though - you should avoid "read" methods, and instead use "subscription" type approaches which will eventually settle.

1 Like

Yeah, we might be diverging a bit too far from my original issue. But it's always good to have one's architecture sanity-checked to ensure that there's no xy-problem there. I don't believe this to be the case in my current scenario.
Also, in my case the loopback-sending is strictly that: sending. There is no output which the actor needs to wait for when sending to itself in my case.

i'd like to add that, although that doesn't solve your immediate problem,given the precedent that that same_channel is on Sender, this might be worth making an issue, or even a PR.

1 Like