Hey folks,
I'm trying to create an app with various long-lived processes. Imagine something that manages connections to Matrix, Mastodon, etc. and feeds them to a front-end. Each Matrix/Mastodon connection is called a session, and backed by a SessionProvider
:
#[allow(unused_variables)]
#[clonable]
#[async_trait::async_trait]
pub trait SessionProvider: Clone {
fn name(&self) -> &str;
async fn create(
store: SessionStore,
directory: &PathBuf,
values: &FieldValues,
) -> anyhow::Result<Session>
where
Self: Sized;
async fn load(store: SessionStore, directory: &PathBuf) -> anyhow::Result<Session>
where
Self: Sized;
async fn start(&mut self) -> anyhow::Result<Box<dyn Stream<Item = ()> + Send + Sync>>;
...
SessionProvider
instances are stored in a Sessions
struct which unfortunately needs to be Send + Sync
.
I haven't quite worked out how bidirectional communication will work in this architecture, but for now I'm just trying to spin up individual session instances and get them polling regularly. I'd hoped to do this by having start
return a future, then have the app maintain a list of all running sessions and poll them. Unfortunately I can't seem to get this working, because Tauri requires the struct I'm keeping state in to be Send + Sync
which doesn't work for futures.
This particular SessionProvider
iteration uses streams because I wanted to see if that got me anything, but I don't think this will work. Since most of the client libraries I'm using expose some sort of long-polling function returning a future, I'd need my own function to not block while continuing to keep the future alive.
Here are some things I've tried:
- I can't store an actual future in either the struct alongside the rest of my data or in something like a static
OnceCell
because none of it isSync
. - I suspect returning a stream as I am now won't work because I need the
start
function not to block, which I can only achieve by notawait
ing the future. - Likewise, I can't do something blocking in the function itself, so I suspect I want something future-like rather than iterator-like.
Here is an example of what I'm trying to run. In this case, my MatrixSession
SessionProvider
implementation takes values from my frontend API and creates a client. I then want to run sync
in the background. But I don't know where to stash the future, or even if this is the right level of abstraction to be working on.
Here's an example of my current non-working start
function.
async fn start(&mut self) -> anyhow::Result<Box<dyn Stream<Item = ()> + Send + Sync>> {
if self.client.is_none() {
self.client = Some(Client::new(self.homeserver.clone()).await?);
}
if let Some(client) = self.client.as_mut() {
client
.restore_login(MatrixSdkSession {
access_token: self.access_token.clone(),
device_id: self.device_id.clone(),
refresh_token: self.refresh_token.clone(),
user_id: self.user_id.clone(),
})
.await?;
client.sync_once(SyncSettings::default()).await?;
client.sync(SyncSettings::default()).await?; // This would block and I don't want that.
Ok(Box::new(stream::empty()))
} else {
Err(anyhow!("No client"))
}
}
Is there some better abstraction that I'm missing here, even if I can't store it alongside the rest of the state for each session instance?
Thanks.