I have a multi-threaded application that makes a lot of gRPC calls, which tend to all go to just one or two servers. I understand that establishing a gRPC connection is expensive, so I'd rather reuse just one connection per server from many threads than to establish n connections, if that makes sense.
So as a very new rust programmer, I wrote the following grpc.rs
file to make acquiring a Channel
from any thread super-easy.
The problem is that my tests reveal a great deal of instability. I frequently get test failures with "transport error" even though the server is on the same machine as the gRPC client testing. Suspecting that the bug may be in the gRPC being shared for parallel use, I commented out the code in the file below related to sharing of Channel
values, instead giving each client its own new one. All the test failures went away. So surely, I'm doing something wrong. Can someone identify it?
use http::Uri;
use std::{cell::RefCell, collections::HashMap, sync::Mutex};
use tonic::transport::{Channel, ClientTlsConfig};
use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
// We'll use a MUTEX to store the shareable gRPC channels, indexed by server URI.
// gRPC channels are expensive to create, cannot be used concurrently, but cheap to clone for each user.
lazy_static! {
static ref CHANNELS: Mutex<HashMap<Uri, RefCell<Channel>>> = Mutex::new(HashMap::new());
}
/// Return a gRPC channel for the given URI, creating one if necessary.
pub(crate) async fn get_grpc_channel(uri: Uri) -> Result<Channel, tonic::transport::Error> {
{
let clients = CHANNELS.lock().unwrap();
if let Some(client) = clients.get(&uri) {
let channel = &*client.borrow();
return Ok(channel.clone());
}
}
let tls = ClientTlsConfig::new().domain_name(uri.host().unwrap());
let channel = Channel::builder(uri.clone())
.tls_config(tls)?
.connect()
.await?;
let mut clients = CHANNELS.lock().unwrap();
clients.insert(uri, RefCell::new(channel.clone()));
Ok(channel)
}
/// Gets the CompactTxStreamerClient for the given URI for use with communicating with the lightwalletd server.
pub async fn get_client(
uri: Uri,
) -> Result<CompactTxStreamerClient<Channel>, tonic::transport::Error> {
let channel = get_grpc_channel(uri).await?;
Ok(CompactTxStreamerClient::new(channel))
}