Process stream with combinators in async environment

Use case

Get a list of messages from database and send it to corresponding WebSocket client. If something fails with a message report it to console and continue execution


tokio::spawn(send_commands(pool, clients));

async fn send_commands(pool: Pool<Postgres>, clients: Clients) {
    loop {
        let messages = sqlx::query_file_as!(CommandRecord, "messages.sql").fetch(&pool);
            .map(|record| -> Result<()> {
                let record = record?;
                let client_id = record.client_id;
                let record: protos::message::Message =;
                let clients = match clients.lock() {
                    Ok(clients) => clients,
                    Err(err) => err.into_inner(),
                if let Some(client) = clients.get(&client_id) {
            .map_err(|err| println!("{:?}", err))
            .for_each(|_| future::ready(()))


  • .for_each(|_| future::ready(())) looks very strange to me but it does not work otherwise. Is there a cleaner way?
  • How can I wait for result from unbounded_send to handle it in current thread?
  • is there a way to write acquire lock on the clients hashmap like this clients.lock()? to not continue execution if clients are poisoned?

