Hello! I'm interested in trying the Synchronous Core, Asynchronous Shell pattern.
My application has different types of connections to the outside world – Unix sockets, a file watcher, and peer-to-peer Iroh connections. In particular, not all protocols use bytes as the "smallest unit" – the file watcher might output a FileEvent::Changed(uri: Uri)
, for example.
The following interface for the sync core would seem nice:
trait Protocol {
type Input;
type Output;
}
trait Connectable<P: Protocol> {
// New connection established.
fn connect(&mut self, id: usize);
// Disconnect.
fn disconnect(&mut self, id: usize);
// Input from a certain connection.
fn input(&mut self, id: usize, data: P::Input);
// Called by the runtime, to poll for output for a certain connection.
fn poll_output(&mut self, id: usize) -> Option<P::Output>;
}
Now I'm pondering how to build the async "runtime" in a way that is extensible.
Here are some traits I'm considering:
#[async_trait]
trait Connection {
type Protocol: Protocol;
async fn send(&mut self, data: <Self::Protocol as Protocol>::Output) -> Result<()>;
async fn read(&mut self) -> Result<<Self::Protocol as Protocol>::Input>;
}
#[async_trait]
trait Listener {
type Protocol: Protocol;
// Called by the runtime, returns with a new connection when it is made.
async fn accept(&mut self) -> Result<Box<dyn Connection<Protocol = Self::Protocol>>>;
}
For the runtime, being able to add new connections and listeners would be nice. Storing them is nontrivial because of their different types. Here's what I came up with, using the anymap3 crate:
#[derive(Default)]
struct Runtime {
listeners: AnyMap,
connections: AnyMap,
daemon: Daemon, // This is the "sync core" implementing some Connectable<P> interfaces.
last_id: usize,
}
impl Runtime {
fn new() -> Self {
Runtime::default()
}
fn add_listener<P: Protocol + 'static>(&mut self, listener: Box<dyn Listener<Protocol = P>>) {
if !self
.listeners
.contains::<Vec<Box<dyn Listener<Protocol = P>>>>()
{
self.listeners
.insert::<Vec<Box<dyn Listener<Protocol = P>>>>(Vec::new());
}
let mut l = self
.listeners
.get_mut::<Vec<Box<dyn Listener<Protocol = P>>>>()
.expect("We just made sure it exists");
l.push(listener);
}
fn add_connection<P: Protocol + 'static>(
&mut self,
connection: Box<dyn Connection<Protocol = P>>,
) where
Daemon: Connectable<P>,
{
<Daemon as Connectable<P>>::connect(&mut self.daemon, self.last_id);
if !self
.connections
.contains::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>()
{
self.connections
.insert::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>(HashMap::new());
}
let mut l = self
.connections
.get_mut::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>()
.expect("We just made sure it exists");
l.insert(self.last_id, connection);
self.last_id += 1;
}
}
Here's my question: How would you "drive" this runtime? I'd need a way to await the futures from the read()
methods of all connections, and from the accept()
methods of all listeners.
It seems that I'd need to make all the futures into the same type, to be able to put them into a FuturesUnordered
, for example. I've been trying to wrap my head around how to do that without introducing an enum, which would lessen modularity/extensibility.