Sync core, async shell pattern in practice

Hello! I'm interested in trying the Synchronous Core, Asynchronous Shell pattern.

My application has different types of connections to the outside world – Unix sockets, a file watcher, and peer-to-peer Iroh connections. In particular, not all protocols use bytes as the "smallest unit" – the file watcher might output a FileEvent::Changed(uri: Uri), for example.

The following interface for the sync core would seem nice:

trait Protocol {
    type Input;
    type Output;
}

trait Connectable<P: Protocol> {
    // New connection established.
    fn connect(&mut self, id: usize);
    // Disconnect.
    fn disconnect(&mut self, id: usize);
    // Input from a certain connection.
    fn input(&mut self, id: usize, data: P::Input);
    // Called by the runtime, to poll for output for a certain connection.
    fn poll_output(&mut self, id: usize) -> Option<P::Output>;
}

Now I'm pondering how to build the async "runtime" in a way that is extensible.

Here are some traits I'm considering:

#[async_trait]
trait Connection {
    type Protocol: Protocol;

    async fn send(&mut self, data: <Self::Protocol as Protocol>::Output) -> Result<()>;
    async fn read(&mut self) -> Result<<Self::Protocol as Protocol>::Input>;
}

#[async_trait]
trait Listener {
    type Protocol: Protocol;

    // Called by the runtime, returns with a new connection when it is made.
    async fn accept(&mut self) -> Result<Box<dyn Connection<Protocol = Self::Protocol>>>;
}

For the runtime, being able to add new connections and listeners would be nice. Storing them is nontrivial because of their different types. Here's what I came up with, using the anymap3 crate:

#[derive(Default)]
struct Runtime {
    listeners: AnyMap,
    connections: AnyMap,
    daemon: Daemon, // This is the "sync core" implementing some Connectable<P> interfaces.
    last_id: usize,
}

impl Runtime {
    fn new() -> Self {
        Runtime::default()
    }

    fn add_listener<P: Protocol + 'static>(&mut self, listener: Box<dyn Listener<Protocol = P>>) {
        if !self
            .listeners
            .contains::<Vec<Box<dyn Listener<Protocol = P>>>>()
        {
            self.listeners
                .insert::<Vec<Box<dyn Listener<Protocol = P>>>>(Vec::new());
        }
        let mut l = self
            .listeners
            .get_mut::<Vec<Box<dyn Listener<Protocol = P>>>>()
            .expect("We just made sure it exists");
        l.push(listener);
    }

    fn add_connection<P: Protocol + 'static>(
        &mut self,
        connection: Box<dyn Connection<Protocol = P>>,
    ) where
        Daemon: Connectable<P>,
    {
        <Daemon as Connectable<P>>::connect(&mut self.daemon, self.last_id);

        if !self
            .connections
            .contains::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>()
        {
            self.connections
                .insert::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>(HashMap::new());
        }
        let mut l = self
            .connections
            .get_mut::<HashMap<usize, Box<dyn Connection<Protocol = P>>>>()
            .expect("We just made sure it exists");
        l.insert(self.last_id, connection);

        self.last_id += 1;
    }
}

Here's my question: How would you "drive" this runtime? I'd need a way to await the futures from the read() methods of all connections, and from the accept() methods of all listeners.

It seems that I'd need to make all the futures into the same type, to be able to put them into a FuturesUnordered, for example. I've been trying to wrap my head around how to do that without introducing an enum, which would lessen modularity/extensibility.

1 Like

You're trying to cut an interface at too low a level for the "X-core. Y-shell" pattern to be useful. Don't worry though, it's easily done and I've been there before!

The core is concerned with logic specific to your application. The shell is all the messy code that allows it to do the "nitty gritty" side effects it needs to do to actually get stuff done. Like communicate over the network, or write to disk, or call an external API, or print something.

The point of this exercise isn't to create a new interface for generically doing these side effects.

To emphasise: Do not create a generic interface for doing network connections

We already have those! You could just use tokio.

No, the point of the exercise is to create an interface tailored for your application. Only your core is going to use this interface. So it's methods are dictated by what your application wants to achieve, at a higher level than "send bytes".

It would be more like "send an invite to player x to join this session", or "monitor the filesystem for changes to the config file". It really depends on what the core of your application is trying to achieve.

The benefit is the boundary. If you wake up one day and decide to use a different, revolutionary p2p communication lib. You could start by creating a new implementation of the necessary interface(s) using that new lib. You could run the same core with either shell and compare. You shouldn't need to change the shell interface, or the core, because your core still wants to do the same stuff :slight_smile:

2 Likes

Thanks, @drmason13, definitely food for thought!

Maybe my problem is that I'm trying to squish all the connections I need into a single Tokio task. It would certainly be easier to have one task per "peer", and per "listener". But then I feel that I'd need an Actor framework to orchestrate all of those, which seems inherently asynchronous. Whereas I'd prefer to put as much of the application logic into a sync core.

You say that the interface of the core should be higher-level than bytes. That makes a lot of sense to me! In my case, I need to do a couple of things at once:

  • Receive file watcher events
  • Accept new network connections from peers
  • Exchange synchronization messages with the peers
  • Accept new connections from local programs (via UNIX sockets)
  • Speak another synchronization protocol with them (indeed, preferably not the bytes themselves, but structured objects)

In my mind, that still results in the traits I mentioned – but the associated types of a Protocol would be a bit more high-level.

Maybe my main problem is: Given a shell that gives me Futures for all the things I'd like to react to (they'll have different output types), how can I best interface them with a sync core? What does the code that translates those Futures to method calls on my core look like?

If someone could point to at an example of the sync core/async shell pattern in the wild, that might be really helpful for me!

I think I may have had this core, shell idea a bit backwards.

I've just watched the screen cast (functional core, imperative shell), and it seems like the shell depends on and imports the core, driving it.

So, since the shell is async and async code can call sync code, the shell can import and use the core (although take care not to block!).

The core simply shouldn't deal with any futures.
I don't think the core should ever be asking the shell to do anything (this was where I had it backwards). The core can transform domain objects in functional ways (data in, data out). The shell uses the core to do the business logic. And is entirely responsible for talking to the outside world in an imperative (and async) way.

Your core might end up being kind of tiny.

You could reuse the same core in a completely different shell, one for a cli interface, one for a web server, what have you. But they'd share the core.

It's interesting that one way to do that is to define an interface for the shell that the core drives, and the other is to have the shell drive the core directly. I think the second approach solves the whole "futures in core how?" problem - there aren't any! :slight_smile:

1 Like