I have num_files sorted files in S3 that I want to merge-sort into a single stream of lines.
let futures: Vec<_> = (0..num_files)
.map(|n| {
format!("files/part-{}", n)
})
.map(|key| {
client.get_object(GetObjectRequest {
bucket: String::from(bucket),
key: key,
..Default::default()
})
})
.map(|fut| fut.map(|res| res.expect("Failed during get object request")))
// [`rusoto_s3::GetObjectOutput`](https://docs.rs/rusoto_s3/0.45.0/rusoto_s3/struct.GetObjectOutput.html)
.map(|fut| {
fut.map(|get_object_output| get_object_output.body.expect("Object body is empty"))
})
.map(|fut| fut.map(|body| BufReader::new(body.into_async_read()).lines()))
.collect();
// res has type std::vec::Vec<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>
let mut res: = join_all(futures).await;
tokio::io::Lines implements tokio::stream::Stream so I'm able to poll from one of the files like so:
while let Some(v) = res[0].next().await {
println!("GOT = {:?}", v);
}
However, when using tokio::stream::StreamMap to poll from all the file streams as lines are being downloaded I get the following error I don't know how to read
error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>` in the current scope
|
= note: the method `next` exists but the following trait bounds were not satisfied:
`tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: futures::Stream`
which is required by `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: tokio::stream::StreamExt`
I don't understand what trait bound is not being implemented and why, if the previous example proves the values I'm inserting in the StreamMap are indeed Streams (?)
Follow-up question: is there a stream combinator that waits for one line from each stream and produces the minimum, to ensure its output is sorted?
If you take a look at the link for StreamMap, it mentions:
Because the StreamMap API moves streams during runtime, both streams and keys must be Unpin. In order to insert a !Unpin stream into a StreamMap, use pin! to pin the stream to the stack or Box::pin to pin the stream in the heap.
Thus, when mapping the futures, try wrapping the last one in a Pin<Box<T>> by Box::pin(..)
error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>` in the current scope
|
= note: the method `next` exists but the following trait bounds were not satisfied:
`tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: futures::Stream`
which is required by `tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: tokio::stream::StreamExt`
If I box the stream just before inserting it into the StreamMap I get a slighty different error.
let mut map = StreamMap::new();
for (i, x) in res.iter().enumerate() {
map.insert(format!("{}", i), Box::pin(x));
}
// Read twice
for _ in 0..2 {
let (key, val) = map.next().await.unwrap();
println!("key = {}, val = {}", key, val);
}
error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>` in the current scope
|
= note: the method `next` exists but the following trait bounds were not satisfied:
`tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: futures::Stream`
which is required by `tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: tokio::stream::StreamExt`
Ok so the problem was, as those pesky ampersands in the error messages indicate, that I was passing references to Streams as the values to StreamMap. Switching to res.into_iter() fixed it.