Hey folks,
I'm currently working on building a library that implements a binary protocol that communicates over TCP. It works something like the following:
You open a connection
to the server, and then you can create one or more channel
s from that connection
. You then use the channel
for sending messages over the protocol - this is the protocols way of handling multiplexing. The connection
must be thread-safe, but the channel
is not required to be (sort of - the implementation must be shareable across threads, but the Trait exposed to the user is not). Generally, you'd create one channel
per thread.
The connection
is responsible for handling the TcpSocket
, so the channel
needs a reference to the connection
to be able to send messages.
The connection
is also responsible for reading from the TcpSocket
and determining which channel should handle the message, so it needs access to all channels, and to spawn a separate thread to do that reading. The connection
also sends regular heartbeat messages, so it needs to spawn a background thread for that.
The result of that is, potentially, three threads trying to access a single connection
, and two of them trying to access the same channel
:
- User thread calls
send_message
onchannel
, which delegates to theconnection
to handle writing to the TCP socket. - Library thread reads a data frame from the
TcpSocket
and callsmessage_received
onconnection
, and delegates to achannel
based on header information from the data frame. - Library thread periodically sends
heartbeat
messages by callingsend_heartbeat
on thechannel
/connection
(not sure how I want to implement it - it technically uses a system-onlychannel
).
So, the connection
needs to know about the channel
, and the channel
needs to know about the connection
.
Note: All following code should be treated as pseudo-Rust, as it either (a) was never intended to compile, (b) should compile, but hasn't been tested, or (c ) I've just forgotten the exact syntax of something.
I imagine the API to (ideally) look something like:
let connection = ConnectionFactory::new().create_connection();
// Connection started by factory
let channel = connection.create_channel();
channel.on_message_received(|message| {
println!(message);
});
Thread::spawn(move || {
// Do other stuff
channel.send_message("foo");
// Do some more stuff
// Channel closed on drop
});
// Eventually
connection.shutdown();
In order for that to work, I need my structs and traits to look like the following, uncompilable, code (some functions left out to minimise complexity):
pub trait Connection {
fn create_channel(&self) -> Result<Box<Channel>>;
}
pub struct ConnectionStruct {
read_socket: TcpSocket,
write_socket: TcpSocket,
channels: HashMap<u16, ChannelStruct>
}
impl ConnectionStruct {
fn transmit(&self, data: [u8]) -> Result<()> {
self.write_socket.write(data)
}
fn start(&self) -> Result<()> {
Thread::spawn(self.heartbeat);
Thread::spawn(self.read_from_socket);
.
.
.
Ok(())
}
}
impl Connection for ConnectionStruct {
fn create_channel(&self) -> Result<Box<Channel>> {
let channel = ChannelStruct { connection: self };
self.channels.put(nextChannelNumber(), &channel);
Ok(Box::new(channel))
}
}
pub trait Channel {
fn send_message(message: &str) -> Result<()>;
fn on_message_received(...) -> Result<()>;
}
pub struct ChannelStruct {
connection: ConnectionStruct,
}
impl Channel for ChannelStruct {
fn send_message(message: &str) -> Result<()> {
self.transmit(message)
}
fn on_message_received(...) -> Result<()> { ... }
}
Now, that obviously can't work, because there's an ownership cycle. I could change ChannelStruct
to take connection: &ConnectionStruct
, but, even ignoring how painful it would be to make the lifetimes work (and they might not), since they're supposed to be able to be passed into other threads the compiler won't be able to guarantee the reference is still valid.
From what I've read, the preferred solution appears to be something like
connection: Weak<ConnectionStruct>
But that's not really compatible with the function fn create_channel(&self) -> Result<Box<Channel>>;
- how do I create the weak
reference in the first place, if &self
is a ConnectionStruct
? Should I be doing something a bit different, like
impl Connection for Arc<ConnectionStruct> { ... }
And not bothering to implement Connection
directly on ConnectionStruct
? Note that users can't create a ConnectionStruct
directly, and instead go through a ConnectionFactory
whose function signature looks like
fn create_connection(&self) -> impl Connection
So it won't matter if the implementation of Connection
is secretly an Arc
, right? The function signature would end up being
fn create_connection(&self) -> impl Connection + Clone + Send + Sync
(Or I put those requirements on the Connection
trait - not sure which is better).
But that seems OK. This would mean that I could then implement create_channel
as something like:
impl Connection for Arc<ConnectionStruct> {
fn create_channel(&self) -> Result<Box<Channel>> {
let channel = ChannelStruct { connection: Arc::downgrade(self) };
self.channels.put(nextChannelNumber(), &channel);
Ok(Box::new(channel))
}
}
Of course, I've still got lifetime problems with self.channels.put(randomU16(), &channel);
, so I guess I should wrap that in a Weak
too? And do impl Channel for Arc<ChannelStruct> { ... }
? So the code would now look like:
impl Connection for Arc<ConnectionStruct> {
fn create_channel(&self) -> Result<Box<Channel>> {
let channel = Arc::new(ChannelStruct { connection: Arc::downgrade(self) });
self.channels.put(nextChannelNumber(), Arc::downgrade(channel));
Ok(Box::new(channel))
}
}
Of course, I've now got a lot of safety checks going on at runtime, so if I have to send a message in response to receiving a message, I might be in trouble. Plus I need to put a few impl Drop
s in there, to close the channel
when it's dropped, and to close the connection
when that's dropped, etc (whole separate thread - if I have to communicate over a network to properly close when something is dropped, should that go in the drop
method?).
Does this look like a relatively sane way of doing things, given the constraints of the protocol? Have I missed something really obvious? I mean, I might (probably (definitely)) need a Mutex
or ten in there to handle cross-thread mutation, but is there anything else I've overlooked?