I have num_files
sorted files in S3 that I want to merge-sort into a single stream of lines.
let futures: Vec<_> = (0..num_files)
.map(|n| {
format!("files/part-{}", n)
})
.map(|key| {
client.get_object(GetObjectRequest {
bucket: String::from(bucket),
key: key,
..Default::default()
})
})
.map(|fut| fut.map(|res| res.expect("Failed during get object request")))
// [`rusoto_s3::GetObjectOutput`](https://docs.rs/rusoto_s3/0.45.0/rusoto_s3/struct.GetObjectOutput.html)
.map(|fut| {
fut.map(|get_object_output| get_object_output.body.expect("Object body is empty"))
})
.map(|fut| fut.map(|body| BufReader::new(body.into_async_read()).lines()))
.collect();
// res has type std::vec::Vec<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>
let mut res: = join_all(futures).await;
tokio::io::Lines
implements tokio::stream::Stream
so I'm able to poll from one of the files like so:
while let Some(v) = res[0].next().await {
println!("GOT = {:?}", v);
}
However, when using tokio::stream::StreamMap
to poll from all the file streams as lines are being downloaded I get the following error I don't know how to read
error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>` in the current scope
|
= note: the method `next` exists but the following trait bounds were not satisfied:
`tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: futures::Stream`
which is required by `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: tokio::stream::StreamExt`
I don't understand what trait bound is not being implemented and why, if the previous example proves the values I'm inserting in the StreamMap
are indeed Stream
s (?)
Follow-up question: is there a stream combinator that waits for one line from each stream and produces the minimum, to ensure its output is sorted?
Thanks!