Parent/Child references with Traits and multi-threading


#1

Hey folks,

I’m currently working on building a library that implements a binary protocol that communicates over TCP. It works something like the following:

You open a connection to the server, and then you can create one or more channels from that connection. You then use the channel for sending messages over the protocol - this is the protocols way of handling multiplexing. The connection must be thread-safe, but the channel is not required to be (sort of - the implementation must be shareable across threads, but the Trait exposed to the user is not). Generally, you’d create one channel per thread.

The connection is responsible for handling the TcpSocket, so the channel needs a reference to the connection to be able to send messages.

The connection is also responsible for reading from the TcpSocket and determining which channel should handle the message, so it needs access to all channels, and to spawn a separate thread to do that reading. The connection also sends regular heartbeat messages, so it needs to spawn a background thread for that.

The result of that is, potentially, three threads trying to access a single connection, and two of them trying to access the same channel:

  1. User thread calls send_message on channel, which delegates to the connection to handle writing to the TCP socket.
  2. Library thread reads a data frame from the TcpSocket and calls message_received on connection, and delegates to a channel based on header information from the data frame.
  3. Library thread periodically sends heartbeat messages by calling send_heartbeat on the channel / connection (not sure how I want to implement it - it technically uses a system-only channel).

So, the connection needs to know about the channel, and the channel needs to know about the connection.

Note: All following code should be treated as pseudo-Rust, as it either (a) was never intended to compile, (b) should compile, but hasn’t been tested, or (c ) I’ve just forgotten the exact syntax of something.

I imagine the API to (ideally) look something like:

let connection = ConnectionFactory::new().create_connection();
// Connection started by factory
let channel = connection.create_channel();
channel.on_message_received(|message| {
  println!(message);
});
Thread::spawn(move || {
  // Do other stuff
  channel.send_message("foo");
  // Do some more stuff
  // Channel closed on drop
});

// Eventually

connection.shutdown();

In order for that to work, I need my structs and traits to look like the following, uncompilable, code (some functions left out to minimise complexity):

pub trait Connection {
  fn create_channel(&self) -> Result<Box<Channel>>;
}

pub struct ConnectionStruct {
  read_socket: TcpSocket,
  write_socket: TcpSocket,
  channels: HashMap<u16, ChannelStruct>
}

impl ConnectionStruct {
  fn transmit(&self, data: [u8]) -> Result<()> {
    self.write_socket.write(data)
  }

  fn start(&self) -> Result<()> {
    Thread::spawn(self.heartbeat);
    Thread::spawn(self.read_from_socket);
    .
    .
    .
    Ok(())
  }
}

impl Connection for ConnectionStruct {
  fn create_channel(&self) -> Result<Box<Channel>> {
     let channel = ChannelStruct { connection: self };
     self.channels.put(nextChannelNumber(), &channel);
     Ok(Box::new(channel))
  }
}

pub trait Channel {
  fn send_message(message: &str) -> Result<()>;
  fn on_message_received(...) -> Result<()>;
}

pub struct ChannelStruct {
  connection: ConnectionStruct,
}

impl Channel for ChannelStruct {
  fn send_message(message: &str) -> Result<()> {
    self.transmit(message)
  }
  fn on_message_received(...) -> Result<()> { ... }
}

Now, that obviously can’t work, because there’s an ownership cycle. I could change ChannelStruct to take connection: &ConnectionStruct, but, even ignoring how painful it would be to make the lifetimes work (and they might not), since they’re supposed to be able to be passed into other threads the compiler won’t be able to guarantee the reference is still valid.

From what I’ve read, the preferred solution appears to be something like

connection: Weak<ConnectionStruct>

But that’s not really compatible with the function fn create_channel(&self) -> Result<Box<Channel>>; - how do I create the weak reference in the first place, if &self is a ConnectionStruct? Should I be doing something a bit different, like

impl Connection for Arc<ConnectionStruct> { ... }

And not bothering to implement Connection directly on ConnectionStruct? Note that users can’t create a ConnectionStruct directly, and instead go through a ConnectionFactory whose function signature looks like

fn create_connection(&self) -> impl Connection

So it won’t matter if the implementation of Connection is secretly an Arc, right? The function signature would end up being

fn create_connection(&self) -> impl Connection + Clone + Send + Sync

(Or I put those requirements on the Connection trait - not sure which is better).

But that seems OK. This would mean that I could then implement create_channel as something like:

impl Connection for Arc<ConnectionStruct> {
  fn create_channel(&self) -> Result<Box<Channel>> {
     let channel = ChannelStruct { connection: Arc::downgrade(self) };
     self.channels.put(nextChannelNumber(), &channel);
     Ok(Box::new(channel))
  }
}

Of course, I’ve still got lifetime problems with self.channels.put(randomU16(), &channel);, so I guess I should wrap that in a Weak too? And do impl Channel for Arc<ChannelStruct> { ... }? So the code would now look like:

impl Connection for Arc<ConnectionStruct> {
  fn create_channel(&self) -> Result<Box<Channel>> {
     let channel = Arc::new(ChannelStruct { connection: Arc::downgrade(self) });
     self.channels.put(nextChannelNumber(), Arc::downgrade(channel));
     Ok(Box::new(channel))
  }
}

Of course, I’ve now got a lot of safety checks going on at runtime, so if I have to send a message in response to receiving a message, I might be in trouble. Plus I need to put a few impl Drops in there, to close the channel when it’s dropped, and to close the connection when that’s dropped, etc (whole separate thread - if I have to communicate over a network to properly close when something is dropped, should that go in the drop method?).

Does this look like a relatively sane way of doing things, given the constraints of the protocol? Have I missed something really obvious? I mean, I might (probably (definitely)) need a Mutex or ten in there to handle cross-thread mutation, but is there anything else I’ve overlooked?


#2

mpsc is possibly better choice than trying with Arc/Mutex. (I assume from what you write you don’t want the async route.)


#3

Derp - I did know about MPSC, but it never occurred to me to see if I could leverage it.

I’m not sure how that would work, though. Perhaps I would spawn another thread dedicated to writing, which reads from an MPSC channel (that terminology clash won’t be at all confusing :confused:), and the protocol channels write to it. That would hopefully solve the "channel must know about connection problem", but the connection would still need to know about all channels it’s handed out, and they still need to be thread-safe.

At this point, my primary concern is to get it working without making it painful to read. I have had a look at Tokio & Futures in the past, but I found them really hard to use. I will have another look once I’ve managed to get an API I’m reasonably happy with up and running.