I'm having some trouble with capturing a trait that implements Stream in a structural type. I would greatly appreciate any help I can get on this issue. I've been stuck for quite a long time now, and I've tried many different approaches.
Executive Summary: I have a struct which stores a Stream, and I want to write a method on that struct which processes the values in that streams.
Here's the permalink to my gist. The gist includes my main.rs source file, along with the Cargo.toml to show I'm using futures-preview. I'm compiling on nightly. It also includes my error message, which is fairly cryptic to me (I'll explain below).
From what I can tell, I'm getting this error because
stores wrapped as a trait object, and for whatever reason dynamic dispatch cannot be performed on the CharacterStream type. To the best of my knowledge, CharacterStream follows the rules of trait objects, but I suspect that supposition is incorrect.
However, the help message throws me.
help: another candidate was found in the following trait, perhaps add a `use` for it:
|
3 | use crate::futures::StreamExt;
|
But I'm importing crate::futures::StreamExt! Is the for_each method coming from another type?? I can't think of another type it could possible be.
This is because a boxed stream is only a stream when it is Unpin (through this impl). Since you don’t explicitly say that the trait object must be Unpin rustc must be conservative and assume it isn’t. You should instead store a Pin<Box<dyn CharacterStream>> which will be a Streamthrough this delegating impl.
Thank you @Nemo157 and @jonh ! I was able to make progress after Jonh's solution, but I understand it better now that's to Nemo's reply. I really appreciate the help!
Here's what I ended up with:
extern crate futures; // 0.3.0-alpha.17
extern crate tokio; // =0.2.0-alpha
use futures::prelude::Stream;
use futures::Poll;
use futures::future;
use tokio::runtime::Runtime;
use std::pin::Pin;
use crate::futures::StreamExt;
pub trait CharacterStream: Stream<Item=char> + Unpin {}
struct StreamWrapper<S> where S: Stream + Unpin + ?Sized {
wrapped: Box<S>,
}
impl <S> StreamWrapper<S> where S: Stream + Unpin + Sized {
fn new(elem: S) -> Self {
StreamWrapper{wrapped: Box::new(elem)}
}
}
#[derive(Clone)]
struct CharVector {
my_stream: Vec<char>,
}
impl CharVector {
fn new(my_string: String) -> Self {
let mut chars: Vec<char> = my_string.chars().collect();
chars.reverse();
CharVector{my_stream: chars}
}
}
impl Stream for CharVector {
type Item = char;
fn poll_next(mut self: Pin<&mut Self>, _ctx: &mut std::task::Context) -> Poll<Option<Self::Item>> {
let c = self.my_stream.pop();
Poll::Ready(c)
}
}
fn main() {
println!("{}", "Starting Rust example!");
let stream = StreamWrapper::new(
CharVector::new(
"abcd".to_owned()
)
);
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("{}", "Executing from inside the future!");
stream.wrapped.for_each(|c| {
print!("{}", c);
future::ready(())
}).await;
});
}
My takeaway was that I needed to specify that my Wrapper was Unpin. This let me implement the Stream interface on the my boxed stream.