How to properly call a grpc endpoint in async? I have a Vec[Requests] which I want to send without waiting for the response

async fn batching_task(
    mut queue: Queue,
    client: GenerationServiceClient<LoadBalancedChannel>,
) {
    while let Some(batch) = queue.next_batch().await {
        let _ = batch.into_iter().map(|mut req| {
            let response = client.clone().generate(req.get_request());
            // TODO : async move to request
            tokio::spawn(async move{
                println!("Moving to async");
                match response.await {
                    Ok(resp) => {
                        let _ = req.send_final_response(Ok(resp.into_inner()));
                    },
                    Err(err) => {
                        println!("\tFailed with error {:?}", err)
                        // TODO : Send error back to server
                    },
                }
            });
        });
    }
}

I am getting error

 --> queuing_router/src/dispatcher.rs:52:28
   |
52 |             let response = client.clone().generate(req.get_request());
   |                            ^^^^^^^^^^^^^^----------------------------- temporary value is freed at the end of this statement
   |                            |
   |                            creates a temporary value which is freed while still in use
   |                            argument requires that borrow lasts for `'static`

I understand error, but how can I resolve this?

Can you show us the type definition for GenerationServiceClient?

In general, when you get an error along the lines of argument requires that borrow lasts for `'static, the simplest fix is to wrap the type into an Rc or an Arc.

GenerationServiceClient is created from protobuf object.

service GenerationService {
  // Generates text given a text prompt, for one or more inputs
  rpc Generate (BatchedGenerationRequest) returns (BatchedGenerationResponse) {}
}

Then wrap it on an Arc as I mentioned before.

I wrapped the Object around Arc>. Still facing issue.

async fn batching_task(
    mut queue: Queue,
    arc_client: Arc<Mutex<GenerationServiceClient<LoadBalancedChannel>>>,
) { 
    while let Some(batch) = queue.next_batch().await {
        batch.into_iter().map(|mut req| {
            let binding: Arc<Mutex<GenerationServiceClient<LoadBalancedChannel>>> = Arc::clone(&arc_client);
            let mut client = binding.lock().unwrap();
            let response = client.generate(req.get_request());
            tokio::spawn(async move{
                match  response.await{
                    Ok(resp) => {
                        let _ = req.send_final_response(Ok(resp.into_inner()));
                    },
                    Err(err) => {
                        println!("\tFailed with error {:?}", err)
                        // TODO : Send error back to server
                    },
                }
            });
        });
    }
}

Error message

error[E0597]: `binding` does not live long enough
  --> queuing_router/src/dispatcher.rs:55:30
   |
54 |             let binding: Arc<Mutex<GenerationServiceClient<LoadBalancedChannel>>> = Arc::clone(&arc_client);
   |                 ------- binding `binding` declared here
55 |             let mut client = binding.lock().unwrap();
   |                              ^^^^^^^ borrowed value does not live long enough
56 |             let response = client.generate(req.get_request());
   |                            ---------------------------------- argument requires that `binding` is borrowed for `'static`
...
68 |         });
   |         - `binding` dropped here while still borrowed

error[E0597]: `client` does not live long enough
  --> queuing_router/src/dispatcher.rs:56:28
   |
55 |             let mut client = binding.lock().unwrap();
   |                 ---------- binding `client` declared here
56 |             let response = client.generate(req.get_request());
   |                            ^^^^^^----------------------------
   |                            |
   |                            borrowed value does not live long enough
   |                            argument requires that `client` is borrowed for `'static`
...
68 |         });
   |         - `client` dropped here while still borrowed

I don't have the bigger picture, so this is just a guess.

You're locking a Mutex and then borrowing its contents, and that borrow is being captured by the spawned task. Mutexes should not be held across spawning of tasks or await points. (In addition they should only be held for a very short duration and no blocking operations should be performed while they are held.)

One thing to try is moving the following two lines inside the spawned task, just above the match. That way you lock and borrow inside the task, when you actually use the binding.

Do the types work out if you write it like this?

This is resolved by changing .into_iter() to for loop over the batch.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.