tokio::task::spawn(async move {
let mut writer = BufWriter::new(file);
let shutdown = async move |mut writer: BufWriter<tokio::fs::File>| {
if let Err(err) = writer.shutdown().await {
log::error!("Unable to shutdown streamer: {}", err.to_string());
}
};
while let Some(file_chunk) = stream_to_hd_rx.next().await {
//log::info!("Async stream received: {:?}", &file_chunk);
if let Some(file_chunk) = file_chunk {
if let Err(err) = writer.write_all(&file_chunk[..]).await {
log::error!("Unable to stream file to HD: {}", err.to_string());
return (shutdown)(writer).await;
}
} else {
log::warn!("File streamer has been signalled to shutdown");
// flush and save
return (shutdown)(writer).await;
}
}
});
The purpose of this async task is to receive chunks of data from a Receiver, and thereafter, write it to a file. What is a cleaner, more functional way of accomplishing this? Also, an important requirement: The task must be finished when a None is passed to the Receiver.
tokio::task::spawn(async move {
let mut writer = BufWriter::new(file);
while let Some(file_chunk) = stream_to_hd_rx.next().await {
//log::info!("Async stream received: {:?}", &file_chunk);
if let Some(file_chunk) = file_chunk {
if let Err(err) = writer.write_all(&*file_chunk).await {
log::error!("Unable to stream file to HD: {}", err);
break;
}
} else {
log::warn!("File streamer has been signalled to shutdown");
// flush and save
break;
}
}
if let Err(err) = writer.shutdown().await {
log::error!("Unable to shutdown streamer: {}", err);
}
});
In the above clause, writer must take a &[u8] for write_all. Maybe, I'd need to make a wrapper type that implements Sink<Item=Vec<u8>>, and write the bytes to the inner Writer. Then, how would cleanup (i.e., shutdown the writer) be accomplished after the future finishes? Would I need to add one more function to the chain above (probably, not sure what it would be though)?
I'm guessing shutdown gets called automatically once the channel drops then? Would shutdown be called for BufWriter<File> once the sender half of stream_to_hd_rx is dropped?
You still need to call it yourself, but it will just exit the while loop when the channel is shut down. In fact, in your code you don't call shutdown in this case.