Process stream with combinators in async environment

Use case

Get a list of messages from database and send it to corresponding WebSocket client. If something fails with a message report it to console and continue execution

Code

tokio::spawn(send_commands(pool, clients));

async fn send_commands(pool: Pool<Postgres>, clients: Clients) {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        let messages = sqlx::query_file_as!(CommandRecord, "messages.sql").fetch(&pool);
        messages
            .map(|record| -> Result<()> {
                let record = record?;
                let client_id = record.client_id;
                let record: protos::message::Message = record.data.try_into()?;
                let clients = match clients.lock() {
                    Ok(clients) => clients,
                    Err(err) => err.into_inner(),
                };
                if let Some(client) = clients.get(&client_id) {
                    client.unbounded_send(Message::Binary(message.data))?;
                };
                Ok(())
            })
            .map_err(|err| println!("{:?}", err))
            .for_each(|_| future::ready(()))
            .await;
    }
}

Questions

  • .for_each(|_| future::ready(())) looks very strange to me but it does not work otherwise. Is there a cleaner way?
  • How can I wait for result from unbounded_send to handle it in current thread?
  • is there a way to write acquire lock on the clients hashmap like this clients.lock()? to not continue execution if clients are poisoned?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.