Storing `impl Stream` and then using at later time

Here is a minimal example (I couldn't use Rust Playground as it didn't appear to have tokio_stream and as_any crates installed.

Essentially I am struggling with being able to store (into a variable) something that impls Stream and then use that later on. The reality of it is I want to pass it out of a DLL into a C# app as a pointer, and then have C# pass that pointer back into the DLL by calling a function that will use that pointer to read (or at least try_recv()) on the stream pointer and pass back the item that was read (if there was one) or an indication that there weren't any items to read.

use as_any::AsAny;
use std::pin::Pin;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::Stream;

struct StoreStream {
    stream: Pin<Box<dyn Stream<Item = u32>>>,
}

pub fn create_stream() -> impl Stream<Item = u32> {
    let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
    // tx is sent to another part of the system here
    //...
    //...
    UnboundedReceiverStream::new(rx)
}

fn main() {
    let mut store_stream = StoreStream {
        stream: Box::pin(create_stream()),
    };

    // now I want to read the stream
    let binding = store_stream.stream.as_any_mut();
    let concrete_stream: &mut UnboundedReceiver<u32> = binding
        .downcast_mut::<UnboundedReceiver<u32>>()
        .unwrap();
    // This unwrap blows up on it being None
    match concrete_stream.try_recv() {
        Ok(item) => {
            // item read, do something with it
        }
        Err(_err) => {
            // Err will be either TryRecvError::Empty or
            // TryRecvError::Disconnected
        }
    }
}

Any ideas as to what I'm doing wrong, and why the unwrap on the final downcast_mut is None?
If as @H2CO3 indicates that the code isn't going to work, as I am trying to downcast to something of a different type, does anyone have any ideas as to how to store that stream pointer and then use it at a later time (somewhere else in the code)?
I hope that made sense.

Please provide a minimal example, preferably on the Playground. The error is obviously subtle, and a high-level description with scattered, incomplete code examples is not going to help us.

One idea I have off the top of my head is: are you creating an UnboundedReceiverChannel and trying to downcast it to an UnboundedReceiver? Those are not the same type, so this isn't going to work.

Unfortunately, Rust Playground doesn't allow for tokio crate use. I will recraft the question as a minimal example

The Playground does have tokio installed.

1 Like

You can use rustexplorer. This works.

I don't know about tokio_stream, but I think UnboundedReceiverStream cannot be downcast to UnboundedReceiver, can they?

No, it can't, since impl Trait type can't be cast back to the original type. So you don't need to cast it at all.

Well, the problem seems to be exactly what I predicted:

  • StoreStream::stream is a Pin<Box<dyn Stream>> created from a value of type UnboundedReceiverStream
  • But you are trying to downcast it to UnboundedReceiver.

Why is that? What are you trying to achieve? Why aren't you casting the value to its original type?

I wanted the function that reads the stream to be non-blocking (i.e. to not do a next, but to issue a try_read that returns Item/NotReady/Disconnected.)
As I still consider myself to be a Rust newbie, I have been struggling to get the syntax right for what I want to do. I will have a further play with your suggestion. Thanks.

Further clarification - I have a tokio runtime (single-threaded) being started up by a function to connect to something, and all that code is async. I wish to talk to the Rust code through C# and as async functions can't be called by C#, I have developed some bridge functions (between the Rust async and the C# FFI) that are non-async and blocking. And in the case of some stream functionality that I want, I don't want to be blocking on the stream waiting for values to be read, all I am after is a try_read() that will return quickly with the value (or not).
try_read appears to be a method on UnboundedReceiver (which is what the original unbounded_channel::<u32>() returned. This was subsequently converted to UnboundedReceiverStream which doesn't have that method.