I'm using example from azurestorageblobs crate to get blob from storage:
use azure_core::error::{ErrorKind, ResultExt};
use azure_storage::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::stream::StreamExt;
#[tokio::main]
async fn read_data() -> Result<impl Stream<Item = Result<Bytes>, OtherError> {
let file_name = "azure_sdk_for_rust_stream_test.txt";
// First we retrieve the account name and access key from environment variables.
let account = std::env::var("STORAGE_ACCOUNT").expect("missing STORAGE_ACCOUNT");
let access_key = std::env::var("STORAGE_ACCESS_KEY").expect("missing STORAGE_ACCOUNT_KEY");
let container = std::env::var("STORAGE_CONTAINER").expect("missing STORAGE_CONTAINER");
let blob_name = std::env::var("STORAGE_BLOB_NAME").expect("missing STORAGE_BLOB_NAME");
let storage_credentials = StorageCredentials::access_key(account.clone(), access_key);
let blob_client = ClientBuilder::new(account, storage_credentials).blob_client(&container, blob_name);
blob_client.put_block_blob("hello world").content_type("text/plain").await?;
let mut result: Vec<u8> = vec![];
// The stream is composed of individual calls to the get blob endpoint
let mut stream = blob_client.get().into_stream();
while let Some(value) = stream.next().await {
let mut body = value?.data;
// For each response, we stream the body instead of collecting it all
// into one large allocation.
while let Some(value) = body.next().await {
let value = value?;
result.extend(&value);
}
}
println!("result: {:?}", result);
Ok(stream::once(async { Ok(Bytes::from(result)) }))
}
The main way to convert a Vec<T> to a Stream of Ts is to use futures::stream::iter(). However, that's not really what you want to do here, since what you are currently doing is collecting into a Vec and then trying to put that in a Stream, which defeats most of the point of making it async. Rather, the thing to do would be, after obtaining stream, to do the work of extracting individual elements inside the stream, as the chunks come in, using functions on Stream. So you want something like stream.map_ok(|x| x.data).try_flatten(). That should give you an impl Stream<Item=Result<bytes::Bytes, azure_core::error::Error>>, which appears to be what you want.
Also, I don't think you want read_data() annotated with tokio::main, since you will want to do something with the stream in another function. You should have your top level function marked as your main function.
Hi, ok thanks I understand now. I've tried add stream.map_ok(|x| x.data).try_flatten() in following context:
let mut stream = blob_client.get().into_stream();
stream.map_ok(|x| x.data).try_flatten()
but compiler complains:
stream.map_ok(|x| x.data).try_flatten();
| ^^^^^^ method not found in `Pageable<GetBlobResponse, Error>`
also from example code stream just get first body and then get data. I've tried to add it also like this:
while let Some(value) = stream.next().await {
let mut body = value?.data;
// For each response, we stream the body instead of collecting it all
// into one large allocation.
body.map_ok(|x| x.data).try_flatten();
}
and it also complains:
body.map_ok(|x| x.data).try_flatten();
| ^^^^^^ method not found in `ResponseBody`
and yes I would need to get as result what you wrote: impl Stream<Item=Result<bytes::Bytes, azure_core::error::Error>>. So is there some other way? Thanks.
This won't work because you don't have a stream of streams there, which is what try_flatten() is resolving. The inner and outer streams get flattened into one, so an explicit loop isn't necessary (and the errors can be handled within the stream).