Create stream from vector

Hi,

I'm using example from azurestorageblobs crate to get blob from storage:

use azure_core::error::{ErrorKind, ResultExt};
use azure_storage::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::stream::StreamExt;

#[tokio::main]
async fn read_data() -> Result<impl Stream<Item = Result<Bytes>, OtherError> {
    let file_name = "azure_sdk_for_rust_stream_test.txt";

    // First we retrieve the account name and access key from environment variables.
    let account = std::env::var("STORAGE_ACCOUNT").expect("missing STORAGE_ACCOUNT");
    let access_key = std::env::var("STORAGE_ACCESS_KEY").expect("missing STORAGE_ACCOUNT_KEY");
    let container = std::env::var("STORAGE_CONTAINER").expect("missing STORAGE_CONTAINER");
    let blob_name = std::env::var("STORAGE_BLOB_NAME").expect("missing STORAGE_BLOB_NAME");

    let storage_credentials = StorageCredentials::access_key(account.clone(), access_key);
    let blob_client = ClientBuilder::new(account, storage_credentials).blob_client(&container, blob_name);

    blob_client.put_block_blob("hello world").content_type("text/plain").await?;

    let mut result: Vec<u8> = vec![];

    // The stream is composed of individual calls to the get blob endpoint
    let mut stream = blob_client.get().into_stream();
    while let Some(value) = stream.next().await {
        let mut body = value?.data;
        // For each response, we stream the body instead of collecting it all
        // into one large allocation.
        while let Some(value) = body.next().await {
            let value = value?;
            result.extend(&value);
        }
    }

    println!("result: {:?}", result);

    Ok(stream::once(async { Ok(Bytes::from(result)) }))
}

I can stream one item from result data but I would like to stream whole file. I'm trying for few hours to figure out but as I'm rust newby I'm bit lost. I've tried: rust - Convert Stream<Item = Result<Vec<T>, _>> to Stream<Item = Result<T>> - Stack Overflow suggestions but it's not compilable :).

Thanks

The main way to convert a Vec<T> to a Stream of Ts is to use futures::stream::iter(). However, that's not really what you want to do here, since what you are currently doing is collecting into a Vec and then trying to put that in a Stream, which defeats most of the point of making it async. Rather, the thing to do would be, after obtaining stream, to do the work of extracting individual elements inside the stream, as the chunks come in, using functions on Stream. So you want something like stream.map_ok(|x| x.data).try_flatten(). That should give you an impl Stream<Item=Result<bytes::Bytes, azure_core::error::Error>>, which appears to be what you want.

Also, I don't think you want read_data() annotated with tokio::main, since you will want to do something with the stream in another function. You should have your top level function marked as your main function.

2 Likes

Hi, ok thanks I understand now. I've tried add stream.map_ok(|x| x.data).try_flatten() in following context:

let mut stream = blob_client.get().into_stream();
stream.map_ok(|x| x.data).try_flatten()

but compiler complains:

  stream.map_ok(|x| x.data).try_flatten();
    |                ^^^^^^ method not found in `Pageable<GetBlobResponse, Error>`

also from example code stream just get first body and then get data. I've tried to add it also like this:

while let Some(value) = stream.next().await {
        let mut body = value?.data;
        // For each response, we stream the body instead of collecting it all
        // into one large allocation.
        body.map_ok(|x| x.data).try_flatten();
    }

and it also complains:

body.map_ok(|x| x.data).try_flatten();
    |                  ^^^^^^ method not found in `ResponseBody`

and yes I would need to get as result what you wrote: impl Stream<Item=Result<bytes::Bytes, azure_core::error::Error>>. So is there some other way? Thanks.

Ah, I think you may need to bring futures::stream::TryStreamExt into scope before the compiler can see the methods from it. Pageable does implement it.

This won't work because you don't have a stream of streams there, which is what try_flatten() is resolving. The inner and outer streams get flattened into one, so an explicit loop isn't necessary (and the errors can be handled within the stream).

2 Likes

Thanks it did the trick. :+1:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.