Trait Objects and Dynamic Dispatch with Futures

Hello Rustaceans!

I'm having some trouble with capturing a trait that implements Stream in a structural type. I would greatly appreciate any help I can get on this issue. I've been stuck for quite a long time now, and I've tried many different approaches.

Executive Summary: I have a struct which stores a Stream, and I want to write a method on that struct which processes the values in that streams.

Here's the permalink to my gist. The gist includes my main.rs source file, along with the Cargo.toml to show I'm using futures-preview. I'm compiling on nightly. It also includes my error message, which is fairly cryptic to me (I'll explain below).

From what I can tell, I'm getting this error because

struct StreamWrapper {
    wrapped: Box<dyn CharacterStream>,
}

stores wrapped as a trait object, and for whatever reason dynamic dispatch cannot be performed on the CharacterStream type. To the best of my knowledge, CharacterStream follows the rules of trait objects, but I suspect that supposition is incorrect.

However, the help message throws me.

help: another candidate was found in the following trait, perhaps add a `use` for it:
   |
3  | use crate::futures::StreamExt;
   |

But I'm importing crate::futures::StreamExt! Is the for_each method coming from another type?? I can't think of another type it could possible be.

I would try adding impl Stream for StreamWrapper.

This is implementation that can be used (for_each) with type Box<dyn Stream> but you have a different type; and there no automatic conversion.

This is because a boxed stream is only a stream when it is Unpin (through this impl). Since you don’t explicitly say that the trait object must be Unpin rustc must be conservative and assume it isn’t. You should instead store a Pin<Box<dyn CharacterStream>> which will be a Stream through this delegating impl.

3 Likes

Thank you @Nemo157 and @jonh ! I was able to make progress after Jonh's solution, but I understand it better now that's to Nemo's reply. I really appreciate the help!

Here's what I ended up with:

extern crate futures; // 0.3.0-alpha.17
extern crate tokio; // =0.2.0-alpha
use futures::prelude::Stream;
use futures::Poll;
use futures::future;
use tokio::runtime::Runtime;
use std::pin::Pin;
use crate::futures::StreamExt;

pub trait CharacterStream: Stream<Item=char> + Unpin {}

struct StreamWrapper<S> where S: Stream + Unpin + ?Sized {
    wrapped: Box<S>,
}

impl <S> StreamWrapper<S> where S: Stream + Unpin + Sized {
    fn new(elem: S) -> Self {
        StreamWrapper{wrapped: Box::new(elem)}
    }
}

#[derive(Clone)]
struct CharVector {
    my_stream: Vec<char>,
}


impl CharVector {
    fn new(my_string: String) -> Self {
        let mut chars: Vec<char> = my_string.chars().collect();
        chars.reverse();
        CharVector{my_stream: chars}
    }
}

impl Stream for CharVector {
    type Item = char;
    fn poll_next(mut self: Pin<&mut Self>, _ctx: &mut std::task::Context) -> Poll<Option<Self::Item>> {
        let c = self.my_stream.pop();
        Poll::Ready(c)
    }
}

fn main() {
    println!("{}", "Starting Rust example!");
    let stream = StreamWrapper::new(
        CharVector::new(
            "abcd".to_owned()
        )
    );

    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("{}", "Executing from inside the future!");
        stream.wrapped.for_each(|c| {
            print!("{}", c);
            future::ready(())
        }).await;
    });
}

My takeaway was that I needed to specify that my Wrapper was Unpin. This let me implement the Stream interface on the my boxed stream.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.