Async_std: How to handle different future types with one stream

Hi,

I'm playing a bit with the following:

use async_std::stream::StreamExt;
use async_std::stream::{self};

#[derive(Clone,Copy)]
struct Type1 {
    a: u32,
}

#[derive(Clone)]
struct Type2 {
    a: String,
}

async fn foo1() -> Type1 {
    Type1 { a: 42 }
}

async fn foo2() -> Type2 {
    Type2 {
        a: "test".to_string(),
    }
}

// Create a shared output enum, used by all stream.
enum Message {
    Num(Type1),
    Text(Type2),
}

#[tokio::main]
async fn main() {

    let a = foo1();
    let b = foo2();

    let a1 = stream::once(a).map(Message::Num);
    let b1 = stream::once(b).map(Message::Text);

    let mut s = a1.merge(b1);

    while let Some(msg) = s.next().await {
       match msg {
            crate::Message::Num(n) => println!("received a u32:: {}", n.a),
            crate::Message::Text(s) => println!("received a string: {}", s.a),
            //_ => break
        }
    }
}

However it looks like that the type matching is a bit more complicated here.

error[E0631]: type mismatch in function arguments
   --> src/main.rs:36:34
    |
26  |     Num(Type1),
    |     --- found signature defined here
...
36  |     let a1 = stream::once(a).map(Message::Num);
    |                              --- ^^^^^^^^^^^^ expected due to this
    |                              |
    |                              required by a bound introduced by this call
    |
    = note: expected function signature `fn(impl Future<Output = Type1>) -> _`
               found function signature `fn(Type1) -> _`
note: required by a bound in `async_std::stream::StreamExt::map`
   --> /home/torsten.may/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-std-1.12.0/src/stream/stream/mod.rs:552:12

Does anybody has an idea how to fix this?

Thanks a lot T.

You've created a stream that returns a future that outputs Type1, not a stream that outputs Type1s.

This might work, if available to you (I'm not very familiar with the Stream ecosystem).

let a1 = stream::once(a).then(|a| a).map(Message::Num);
1 Like

There's also once from the futures crate does what you expected: once in futures::stream - Rust

async-std isn't on the playground so here's one that only uses futures, swapping out async_std::stream::StreamExt::merge for futures::stream::select: Rust Playground

2 Likes

Wow thanks a lot for your answers awesome!

Especially that solution without using unstable async_std functionality. Iā€˜m still a rust rookie and it helped me a lot.

Many thanks,
Torsten

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.