I'm using tokio to deal with async I/O for writing bytestream of gRPC requests. I'm creating a blocking function to bridge the world between async and sync, since my entrypoint is synchronous. I followed the second pattern in the tokio briding with sync code and spawning each async I/O in as a separate task. The following is a sample code snippet:
impl NonBlockingClient {
pub fn new() -> Result<NonBlockingClient> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let bs_client = rt.block_on(create_bs_client())?;
Ok(Self {
rt: rt,
bs_client: bs_client,
handles: vec![],
})
}
/// writes a large file
pub fn write_file(&mut self, digest: &Digest, path: &Path) -> Result<()> {
let mut bs_client = self.bs_client.clone();
let digest = digest.clone();
let path = path.to_path_buf().clone();
let handle = self
.rt
.spawn(async move { bs_write_file(&mut bs_client, &digest, path).await });
self.handles.push(handle);
Ok(())
}
pub fn wait(&mut self) -> Result<()> {
for handle in self.handles.iter_mut() {
println!("wait on {:?}", handle);
let res = self.rt.block_on(handle);
if res.is_err() {
println!("failed to upload {}", res.unwrap_err());
}
}
Ok(())
}
}
pub(crate) async fn bs_write_file(
client: &mut BsClient,
digest: &Digest,
path: PathBuf,
) -> Result<()> {
println!("write_file: {:?} path: {:?}", digest, path);
let f = File::open(path).await?;
let stream = WriteRequestStream::new(f, digest);
client
.write(stream)
.await
.map(|_v| ())
.map_err(|e| anyhow::Error::msg(format!("failed to write file blob {}", e)))
}
I learned join_all is problematic from multiple sources (eg. this one), so I implemented the join for all handles by myself
However, the code could randomly stuck (but with high chance) to got stuck on wait on JoinHandle
.
It will be great if here could spare some advise on how to debug such issue? Or is there anything I'm doing is off?