Issues with mutability and futures

Hello,

In my application I use the etcd client API:

[dependencies]
etcd-client = "0.15.0"

This defines:

    pub async fn put(
        &mut self,
        key: impl Into<Vec<u8>>,
        value: impl Into<Vec<u8>>,
        options: Option<PutOptions>,
    ) -> Result<PutResponse> {
        self.kv.put(key, value, options).await
    }

I want to loop over a map of key/value pairs and call this method, but it complains that the client can only be borrowed mutably once.

Here is the code I wrote:

use std::vec;
use etcd_client::{Client, Error};
use std::collections::HashMap;
use futures;

async fn create_keys() -> Result<(), Error> {
    let mut dict = HashMap::new();
    dict.insert("k1", "vv1");
    dict.insert("k2", "vv2");
    dict.insert("k3", "vv3");

    let mut client = Client::connect(["localhost:2379"], None).await?;
    let mut futures = vec![];
    for (k,v) in dict.iter() {
        let fut = async {
            client.put(*k, *v, None).await;
        };
        futures.push(fut);
    }
    futures::future::join_all(futures).await;
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    create_keys().await?;
    Ok(())
}

Here is the problem description:

error[E0499]: cannot borrow `client` as mutable more than once at a time
  --> mycrate/src/main.rs:15:19
   |
15 |         let fut = async {
   |                   ^^^^^ `client` was mutably borrowed here in the previous iteration of the loop
16 |             client.put(*k, *v, None).await;
   |             ------ borrows occur due to use of `client` in coroutine
...
20 |     futures::future::join_all(futures).await;
   |                               ------- first borrow used here, in later iteration of loop

For more information about this error, try `rustc --explain E0499`.
error: could not compile `node` (bin "mycrate") due to 1 previous error

I'm not sure how to use the API to do what I need. The key-value mappings I need to populate are not known at compile time (the hashmap is actually a surrogate for reading from a file) and I am quite new to Rust even (let alone async).

It sounds like (from the relevant docs) that you’re supposed to clone the kinds of client objects for making concurrent requests.

In the case of etcd_client::Client, it’s perhaps reasonable to save some of the overhead of this cloning by only duplicating the kv-protocol related parts/handles (e.g. with the .kv_client() method).

So maybe a small change like

async fn create_keys() -> Result<(), Error> {
    let mut dict = HashMap::new();
    dict.insert("k1", "vv1");
    dict.insert("k2", "vv2");
    dict.insert("k3", "vv3");

    let mut client = Client::connect(["localhost:2379"], None).await?;
    let mut futures = vec![];
    for (k,v) in dict.iter() {
+       let mut client = client.kv_client();
-       let fut = async {
+       let fut = async move {
            client.put(*k, *v, None).await;
        };
        futures.push(fut);
    }
    futures::future::join_all(futures).await;
    Ok(())
}

is enough.


Incidentally, the whole process of

  • let mut futures = vec![];
  • loop with futures.push(fut);
  • then futures::future::join_all(futures)

could probably be compactified; maybe something like this can work? (untested code)

async fn create_keys() -> Result<(), Error> {
    let mut dict = HashMap::new();
    dict.insert("k1", "vv1");
    dict.insert("k2", "vv2");
    dict.insert("k3", "vv3");

    let mut client = Client::connect(["localhost:2379"], None).await?;
    futures::future::join_all(dict.iter().map(|(k, v)| {
        let mut client = client.kv_client();
        async move {
            client.put(*k, *v, None).await;
        }
    }))
    .await;
    Ok(())
}

…aand, that would still ignore the results (and notably errors) from the .put calls, so perhaps rather:

async fn create_keys() -> Result<(), Error> {
    let mut dict = HashMap::new();
    dict.insert("k1", "vv1");
    dict.insert("k2", "vv2");
    dict.insert("k3", "vv3");

    let mut client = Client::connect(["localhost:2379"], None).await?;
    let responses /* : Vec<PutResponse> */ = futures::future::try_join_all(dict.iter().map(|(k, v)| {
        let mut client = client.kv_client();
        async move { client.put(*k, *v, None).await }
    }))
    .await?;
    Ok(())
}

(still, untested, so some remaining compilation errors aren’t unlikely; I didn’t want to install all the dependencies :innocent:)

I'm not familiar with etcd, but in general, there are several possibilities here:

  • The mutual exclusion implied by &mut self is necessary here; the actual protocol cannot handle more than one operation at a time. You will have to open multiple connections (use multiple clients) to achieve parallelism.
  • The mutual exclusion is not necessary; the etcd_client library is offering an overly restrictive API to you. Fixing this requires modifying etcd_client.
  • Perhaps etcd_client offers a way to do many operations at once more efficiently. Client::txn() sounds promising.
  • @steffahn's post is more precise — you clone the client to get concurrent usage. Still, a transaction may be even better than separate put()s.