Merge-sorting tokio::io::Lines streams using StreamMap not compiling

I have num_files sorted files in S3 that I want to merge-sort into a single stream of lines.

let futures: Vec<_> = (0..num_files)
    .map(|n| {
        format!("files/part-{}", n)
    })
    .map(|key| {
        client.get_object(GetObjectRequest {
            bucket: String::from(bucket),
            key: key,
            ..Default::default()
        })
    })
    .map(|fut| fut.map(|res| res.expect("Failed during get object request")))
    // [`rusoto_s3::GetObjectOutput`](https://docs.rs/rusoto_s3/0.45.0/rusoto_s3/struct.GetObjectOutput.html)
    .map(|fut| {
        fut.map(|get_object_output| get_object_output.body.expect("Object body is empty"))
    })
    .map(|fut| fut.map(|body| BufReader::new(body.into_async_read()).lines()))
    .collect();

// res has type std::vec::Vec<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>
let mut res: = join_all(futures).await;

tokio::io::Lines implements tokio::stream::Stream so I'm able to poll from one of the files like so:

while let Some(v) = res[0].next().await {
    println!("GOT = {:?}", v);
}

However, when using tokio::stream::StreamMap to poll from all the file streams as lines are being downloaded I get the following error I don't know how to read

error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>` in the current scope
    |
    = note: the method `next` exists but the following trait bounds were not satisfied:
            `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: futures::Stream`
            which is required by `tokio::stream::StreamMap<std::string::String, &tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>: tokio::stream::StreamExt`

I don't understand what trait bound is not being implemented and why, if the previous example proves the values I'm inserting in the StreamMap are indeed Streams (?)

Follow-up question: is there a stream combinator that waits for one line from each stream and produces the minimum, to ensure its output is sorted?

Thanks!

If you take a look at the link for StreamMap, it mentions:

Because the StreamMap API moves streams during runtime, both streams and keys must be Unpin. In order to insert a !Unpin stream into a StreamMap, use pin! to pin the stream to the stack or Box::pin to pin the stream in the heap.

Thus, when mapping the futures, try wrapping the last one in a Pin<Box<T>> by Box::pin(..)

[...] .map(|fut| fut.map(|body| Box::pin(BufReader::new(body.into_async_read()).lines())))
    .collect();

Your proposed change yields a similar error.

error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>` in the current scope
    |
    = note: the method `next` exists but the following trait bounds were not satisfied:
            `tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: futures::Stream`
            which is required by `tokio::stream::StreamMap<std::string::String, &std::pin::Pin<std::boxed::Box<tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: tokio::stream::StreamExt`

If I box the stream just before inserting it into the StreamMap I get a slighty different error.

let mut map = StreamMap::new();

for (i, x) in res.iter().enumerate() {
    map.insert(format!("{}", i), Box::pin(x));
}

// Read twice
for _ in 0..2 {
    let (key, val) = map.next().await.unwrap();
    println!("key = {}, val = {}", key, val);
}
error[E0599]: no method named `next` found for struct `tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>` in the current scope
    |
    = note: the method `next` exists but the following trait bounds were not satisfied:
            `tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: futures::Stream`
            which is required by `tokio::stream::StreamMap<std::string::String, std::pin::Pin<std::boxed::Box<&tokio::io::Lines<tokio::io::BufReader<impl std::marker::Send+std::marker::Sync+tokio::io::AsyncRead>>>>>: tokio::stream::StreamExt`

Ok so the problem was, as those pesky ampersands in the error messages indicate, that I was passing references to Streams as the values to StreamMap. Switching to res.into_iter() fixed it.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.