Strange Compile Error, async-std + futures, one type is more general than the other

It compiled when I change some code.

use futures;
use futures::{StreamExt};
use async_std::prelude::*;
use async_std::task;
use std::time::Duration;
use async_std::sync::RwLock;
use futures::channel::mpsc::{UnboundedSender, UnboundedReceiver, unbounded};


async fn test() {
    let state = &RwLock::new(5i32);

    let (query_tx, query_rx) = unbounded::<i32>();
    let (tx, rx) = unbounded::<i32>();

    let task_write = async {
        for i in 1..5 {
            query_tx.unbounded_send(i).unwrap();
            task::sleep(Duration::from_secs(1)).await;
        }
    };
    
    let task_read = query_rx
        .filter_map(|tick| async move {
            let state_read = state.read().await; //FAIL, remove this line => COMPILE
            Some(tick)
        })
        .map(Ok).forward(tx); //FAIL
        //.for_each(|tick| async move { println!("{}", tick); }); //COMPILE

    let task_print = rx.for_each(|tick| async move {
        println!("{}", tick);
    });
    
    futures::pin_mut!(task_write, task_read, task_print);
    let task1 = futures::future::select(task_write, task_read);
    let task2 = futures::future::select(task1, task_print);
    task2.await;
}


fn main() {
    // A: COMPILE
    // task::block_on(test());

    // B: FAIL
    task::block_on(async {
        task::spawn(test()).await
    })
}
  |
7 | use futures::channel::mpsc::{UnboundedSender, UnboundedReceiver, unbounded};
  |                              ^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

error[E0308]: mismatched types
   --> src/main.rs:48:9
    |
24  |           .filter_map(|tick| async move {
    |  _______________________________________-
25  | |             let state_read = state.read().await;
26  | |             Some(tick)
27  | |         })
    | |         -
    | |         |
    | |_________the expected generator
    |           the found generator
...
48  |           task::spawn(test()).await
    |           ^^^^^^^^^^^ one type is more general than the other
    |
    = note: expected opaque type `impl core::future::future::Future`
               found opaque type `impl core::future::future::Future`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0308`.
error: could not compile `mo_harvester`.

To learn more, run the command again with --verbose.

Hi,

async-std maintainer here. I'm having a hard time getting down to exactly where that error message comes from quickly, but I can guide your eyes a little: if you remove use async_std::prelude::*; and take all async-std traits out of scope, the error is the same. So the problem seems to be purely futures.

Florian

I also find the error quite obscure, but I also found it hard to imagine what you are trying to do, as you use filter_map without actually filtering anything, seemingly to get the state.read() as a side effect.

After figuring out what the code does exactly, I wondered how I would have written that and it would be:

let task_read = async move
{
   while let Some(tick) = query_rx.next().await
   {
      let _ = state.read().await;

      tx.send( tick ).await.expect( "send on tx" );
   }
};

I feel it's more simple to reason about than the combinators and as a benefit, it compiles just fine!

So yeah, there is some strange issue with the combinators and a sub-optimal diagnostics message here.

As you can see there is an expect in my version. The send on the UnboundedSender is fallible and so you have to convert things to TryStream along the way to forward. I wonder if there is not something going on where the error type in the final task_read becomes incompatible with one of the other tasks you select on? But that's the problem with the combinators, it becomes quite hard to trace all the types through a more complicated operation.

That being said, the two versions in your main should be functionally equivalent, so something is definitely fishy here.

1 Like

If you replace your main with this, you still get the same error:

pub fn assert_send<S: Send>(x: S) -> S { x }

fn main() {
    assert_send(test());
}

So in order to disect difficult compiler errors, something I tend to do is make the types more explicit. Here in order to see what the closure exactly does, I did this:

async fn filter_it(tick: i32, state: &RwLock<i32>) -> Option<i32>
{
  let _ = state.read().await; //FAIL, remove this line => COMPILE
  Some(tick)
}

let task_read = query_rx
    .filter_map( |tick| filter_it( tick, state) )
    .map(Ok).forward(tx);

So here we can see that the closure is capturing a reference to the RwLock. Since you use combinators to go all the way to the forward, the reference stays in the future that is task_read.

One key difference between block_on and spawn here is that spawn requires the future to be 'static where block_on does not. I suspect that the "one generator is more generic than another" error has to do with the difference in lifetimes.

It's more likely the requirement that the spawned future must be Send than that it must be 'static as my main function above does not fail if you replace the assert with something that asserts it is 'static.

1 Like

Maybe one of the futures combinator types is !Send?

It's quite weird because it seems like both the RwLock and the read guard implements both Send and Sync. But perhaps it's the future returned by the read method?

I can't check as I'm on phone.

The while loop I posted above compiles fine. It has all the same types in it, except for the combinator types. The RwLockReadGuard from async-std is both Send + Sync.

Oh, I see what you mean. I'll test it.

No, assert_send claims it's Send.

also,

    let task_read = assert_send( task_read );
    let task1 = assert_send( task1 );
    let task2 = assert_send( task2 );

all compile fine.

It's probably worth linking this here as well:
https://github.com/rust-lang/rust/issues/64650

Ok, after reading through this thread, and this one, if figured this works:

async fn filter_it<'a>(tick: i32, state: &'a RwLock<i32>) -> Option<i32>
{
   let _ = state.read().await; //FAIL, remove this line => COMPILE
   Some(tick)
}

let task_read = query_rx
   .filter_map( |tick| filter_it( tick, state) )
   .map(Ok).forward(tx);

So it's a lifetime issue after all...

You're right. I've rewrite it with a single for_each. This error occurs when I tried to add some side effect based on the template code(eg. filter using a HashSet which may be modified by other task).

I got the same compile error for your async fn filter_It.

Note the <'a>. That's what made the difference.

I've copied your code, the error still exists.

error[E0308]: mismatched types
  --> src/main.rs:58:9
   |
10 | async fn filter_it<'a>(tick: i32, state: &'a RwLock<i32>) -> Option<i32>
   |                                                              -----------
   |                                                              |
   |                                                              the `Output` of this `async fn`'s expected opaque type
   |                                                              the `Output` of this `async fn`'s found opaque type
...
58 |         task::spawn(test()).await
   |         ^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected opaque type `impl core::future::future::Future`
              found opaque type `impl core::future::future::Future`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0308`.
error: could not compile `mo_harvester`.

To learn more, run the command again with --verbose.

and I do not quite understand why <'a> helps.

oh boy, I'm sorry. I must have had the wrong part in main commented out when I tested it. You are right. It still doesn't work.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.