Writing this cleanly

                tokio::task::spawn(async move {
                    let mut writer = BufWriter::new(file);
                    let shutdown = async move |mut writer: BufWriter<tokio::fs::File>| {
                        if let Err(err) = writer.shutdown().await {
                            log::error!("Unable to shutdown streamer: {}", err.to_string());
                        }
                    };
                    

                    while let Some(file_chunk) = stream_to_hd_rx.next().await {
                        //log::info!("Async stream received: {:?}", &file_chunk);
                        if let Some(file_chunk) = file_chunk {
                            if let Err(err) = writer.write_all(&file_chunk[..]).await {
                                log::error!("Unable to stream file to HD: {}", err.to_string());
                                return (shutdown)(writer).await;
                            }
                        } else {
                            log::warn!("File streamer has been signalled to shutdown");
                            // flush and save
                            return (shutdown)(writer).await;
                        }
                    }
                });

The purpose of this async task is to receive chunks of data from a Receiver, and thereafter, write it to a file. What is a cleaner, more functional way of accomplishing this? Also, an important requirement: The task must be finished when a None is passed to the Receiver.

                tokio::task::spawn(async move {
                    let mut writer = BufWriter::new(file);

                    while let Some(file_chunk) = stream_to_hd_rx.next().await {
                        //log::info!("Async stream received: {:?}", &file_chunk);
                        if let Some(file_chunk) = file_chunk {
                            if let Err(err) = writer.write_all(&*file_chunk).await {
                                log::error!("Unable to stream file to HD: {}", err);
                                break;
                            }
                        } else {
                            log::warn!("File streamer has been signalled to shutdown");
                            // flush and save
                            break;
                        }
                    }

                    if let Err(err) = writer.shutdown().await {
                        log::error!("Unable to shutdown streamer: {}", err);
                    }
                });

Was looking for something more like:

stream_to_hd_rx.take_while(|chunk| chunk.is_some()).map(|chunk| chunk.unwrap()).forward(writer [...])

In the above clause, writer must take a &[u8] for write_all. Maybe, I'd need to make a wrapper type that implements Sink<Item=Vec<u8>>, and write the bytes to the inner Writer. Then, how would cleanup (i.e., shutdown the writer) be accomplished after the future finishes? Would I need to add one more function to the chain above (probably, not sure what it would be though)?

Do you really need the item type to be an option? Channels already have shutdown-detection built in.

I'm guessing shutdown gets called automatically once the channel drops then? Would shutdown be called for BufWriter<File> once the sender half of stream_to_hd_rx is dropped?

You still need to call it yourself, but it will just exit the while loop when the channel is shut down. In fact, in your code you don't call shutdown in this case.

Without the option item type, you can do this:

tokio::spawn(async move {
    let mut writer = BufWriter::new(file);
    let mut reader = tokio::io::stream_reader(stream_to_hd_rx);

    match tokio::io::copy(&mut reader, &mut writer).await {
        Ok(_) => {},
        Err(err) => {
            log::error!("oops: {}", err);
        },
    }

    match writer.shutdown().await {
        Ok(()) => {},
        Err(err) => {
            log::error!("Unable to shut down streamer: {}", err);
        },
    };
});

Much better. By the way, is there a functional way of doing the same thing? Or, is it not possible because we need to recover the BufWriter?

Maybe you can do something to make it a single expression, but I don't think it will become clearer nor easier to read by doing that.

1 Like

For future users:

To wrap a stream_reader around a Receiver that receives Vec<u8>, you must:

let mut reader = tokio::io::stream_reader(stream_to_hd_rx.map(|r| Ok(std::io::Cursor::new(r))));

According to: bytes::buf::Buf - Rust

impl<T: AsRef<[u8]>> Buf for Cursor<T>

Since T: AsRef<[u8]> exists for Vec<u8>, just wrap a std::io::Cursor around it. (Unless I'm doing it the hard way)

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.