Pattern for long-lived concurrent tasks

Hey folks,

I'm trying to create an app with various long-lived processes. Imagine something that manages connections to Matrix, Mastodon, etc. and feeds them to a front-end. Each Matrix/Mastodon connection is called a session, and backed by a SessionProvider:

#[allow(unused_variables)]
#[clonable]
#[async_trait::async_trait]
pub trait SessionProvider: Clone {
    fn name(&self) -> &str;

    async fn create(
        store: SessionStore,
        directory: &PathBuf,
        values: &FieldValues,
    ) -> anyhow::Result<Session>
    where
        Self: Sized;

    async fn load(store: SessionStore, directory: &PathBuf) -> anyhow::Result<Session>
    where
        Self: Sized;

    async fn start(&mut self) -> anyhow::Result<Box<dyn Stream<Item = ()> + Send + Sync>>;
...

SessionProvider instances are stored in a Sessions struct which unfortunately needs to be Send + Sync.

I haven't quite worked out how bidirectional communication will work in this architecture, but for now I'm just trying to spin up individual session instances and get them polling regularly. I'd hoped to do this by having start return a future, then have the app maintain a list of all running sessions and poll them. Unfortunately I can't seem to get this working, because Tauri requires the struct I'm keeping state in to be Send + Sync which doesn't work for futures.

This particular SessionProvider iteration uses streams because I wanted to see if that got me anything, but I don't think this will work. Since most of the client libraries I'm using expose some sort of long-polling function returning a future, I'd need my own function to not block while continuing to keep the future alive.

Here are some things I've tried:

  • I can't store an actual future in either the struct alongside the rest of my data or in something like a static OnceCell because none of it is Sync.
  • I suspect returning a stream as I am now won't work because I need the start function not to block, which I can only achieve by not awaiting the future.
  • Likewise, I can't do something blocking in the function itself, so I suspect I want something future-like rather than iterator-like.

Here is an example of what I'm trying to run. In this case, my MatrixSession SessionProvider implementation takes values from my frontend API and creates a client. I then want to run sync in the background. But I don't know where to stash the future, or even if this is the right level of abstraction to be working on.

Here's an example of my current non-working start function.

    async fn start(&mut self) -> anyhow::Result<Box<dyn Stream<Item = ()> + Send + Sync>> {
        if self.client.is_none() {
            self.client = Some(Client::new(self.homeserver.clone()).await?);
        }
        if let Some(client) = self.client.as_mut() {
            client
                .restore_login(MatrixSdkSession {
                    access_token: self.access_token.clone(),
                    device_id: self.device_id.clone(),
                    refresh_token: self.refresh_token.clone(),
                    user_id: self.user_id.clone(),
                })
                .await?;
            client.sync_once(SyncSettings::default()).await?;
            client.sync(SyncSettings::default()).await?; // This would block and I don't want that.
            Ok(Box::new(stream::empty()))
        } else {
            Err(anyhow!("No client"))
        }
    }

Is there some better abstraction that I'm missing here, even if I can't store it alongside the rest of the state for each session instance?

Thanks.

So you want the start function on your trait to essentially perform arbitrary tasks without preventing the caller of the start function from continuing immediately? If that’s the case it sounds like you want to spawn the code inside your start function into it’s own top level task, and have some mechanism for the new task to communicate with the caller (which presumably is coordinating all of the providers).

The term “blocking” usually has a very specific meaning in the context of async code (when things are working correctly anyway), so I’m a bit confused about what you mean when you say client.sync(…).await would block. Does that call never exit and provide results via some other mechanism?

Sounds about right.

And you're right, I used my terminology incorrectly. What I meant was
that the await would prevent the function from returning until sync
resolved, and it continues running until explicitly cancelled.

Your async runtime should have some way to spawn a new task from a future. Tokio has tokio::spawn. You could either spawn the new task from inside the start method, or if all of your providers will be doing similar things you could have the caller do the spawning instead.

Generally something like an mpsc channel (or two) is used to communicate with the spawned task.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.