Cycle detecting using tokio's spawn

Hello,
I have encountered another tokio error, unfortunately blocking me from further work. Here is the code and an explanation below:

async fn get_ingredients_for_menu(&self, mut _menu: Menu) -> Result<Menu, Error> {
    let _page_configs: Vec<PageConfig> = 
        self.sub_pages
            .iter()
            .cloned()
            .filter(|_sub_page_config| _sub_page_config.is_for_dish_type(_menu._dish_type))
            .collect();

    let _sub_page_config: PageConfig= 
        _page_configs
            .get(0)
            .unwrap()
            .clone();

    let _sub_page_details_provider: Arc<KwestiasmakuDataProvider<KwestiasmakuClient>> = Arc::new(PageDataProvider::new(
        _sub_page_config,
        self.client.clone(),
        0
    ));

    let _dishes: Vec<Arc<Mutex<MenuItem>>> = 
        _menu._dishes.into_iter().map(|_dish| Arc::new(Mutex::new(_dish))).collect();

    let handlers: Vec<tokio::task::JoinHandle<()>> = _dishes.iter()
        .map(|_d|{
            let _provider = _sub_page_details_provider.clone();
            let _dish = _d.clone();

            tokio::spawn(async move {
                let _dish = _dish.lock().unwrap();
                let _ingredients = _provider.get_ingredients_for_dish(&_dish).await.unwrap();
                _dish.update_with_ingredients(_ingredients);
            })
        }).collect();

    for _each_h in handlers {
        let _ = _each_h.await;
    }

    Ok(_menu)
}

and here is an error:

Compiling hungry v0.3.0 (/home/mszopa/Desktop/hungry)
error[E0391]: cycle detected when processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 183:2>::get_ingredients_for_menu::__get_ingredients_for_menu`
   --> src/data_sources/kwestiasmaku.rs:140:5
    |
140 |     async fn get_ingredients_for_menu(&self, mut _menu: Menu) -> Result<Menu, Error> {
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
note: ...which requires processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 183:2>::get_ingredients_for_menu::__get_ingredients_for_menu::{{closure}}#0::{{closure}}#2::{{closure}}#0`...
   --> src/data_sources/kwestiasmaku.rs:167:41
    |
167 |                   tokio::spawn(async move {
    |  _________________________________________^
168 | |                     let _provider = _sub_page_details_provider.clone();
169 | |                     let _dish = _d.clone();
170 | |                     let _dish = _dish.lock().unwrap();
...   |
173 | |                     print!("asdasd");
174 | |                 })
    | |_________________^
    = note: ...which again requires processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 183:2>::get_ingredients_for_menu::__get_ingredients_for_menu`, completing the cycle
note: cycle used when processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 183:2>::get_ingredients_for_menu::__get_ingredients_for_menu::{{closure}}#0`
   --> src/data_sources/kwestiasmaku.rs:140:86
    |
140 |       async fn get_ingredients_for_menu(&self, mut _menu: Menu) -> Result<Menu, Error> {
    |  ______________________________________________________________________________________^
141 | |         let _page_configs: Vec<PageConfig> = 
142 | |             self.sub_pages
143 | |                 .iter()
...   |
181 | |         Ok(_menu)
182 | |     }
    | |_____^

I completely don't get that. I don't see any cycle here. I saw in tokio's docs that such error may occur because I am holding Send! values before async call, but as you can see in my example it is not possible to change that... Could you please help me with that? I have no idea what I am doing wrong...

Ah.. That's a bad error.. It's probably related how to you can't have recursive cycles in async fn.

It will probably work if you define a non-async function containing just the call to tokio::spawn and use that instead of spawning directly in the async function.

let handlers: Vec<tokio::task::JoinHandle<()>> = _dishes.iter()
    .map(|_d|{
        let provider = _sub_page_details_provider.clone();
        let dish = _d.clone();

        spawn_update_with_ingredients(dish, provider)
    }).collect();


fn spawn_update_with_ingredients(
    dish: Arc<Mutex<MenuItem>>,
    provider: Arc<KwestiasmakuDataProvider<KwestiasmakuClient>>
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let dish = dish.lock().unwrap();
        let ingredients = provider.get_ingredients_for_dish(&dish).await.unwrap();
        dish.update_with_ingredients(ingredients);
    })
}

I imagine the issue is that

may be calling an async fn which itself may be recusrively calling .get_ingredients_for_menu() again?

If so, can you try doing?

+ use ::futures::future::FutureExt:

            tokio::spawn(async move {
                let _dish = _dish.lock().unwrap();
                let _ingredients = _provider.get_ingredients_for_dish(&_dish).await.unwrap();
                _dish.update_with_ingredients(_ingredients);
-           })
+           }.boxed())

and see if that solves the issue? This change will introduce the necessary pointer indirection for a recursive type.


If it doesn't solve the issue, maybe it will lead to an improved error message :slightly_smiling_face:

Feel free to share the whole code snippet, through the playground, github, etc. (you get the gist :wink: )

Hello, thanks for your replay. I don't have a good news :frowning: Unfortunatelly rust is still unable to compile that and keeps returning same error:

error[E0391]: cycle detected when processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 188:2>::spawn_update_with_ingredients::{{closure}}#0`
   --> src/data_sources/kwestiasmaku.rs:144:33
    |
144 |           tokio::spawn(async move {
    |  _________________________________^
145 | |             let dish = dish.lock().unwrap();
146 | |             let ingredients = provider.get_ingredients_for_dish(&dish).await.unwrap();
147 | |             dish.update_with_ingredients(ingredients);
148 | |         })
    | |_________^
    |
note: ...which requires processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 188:2>::spawn_update_with_ingredients`...
   --> src/data_sources/kwestiasmaku.rs:140:5
    |
140 | /     fn spawn_update_with_ingredients(
141 | |         dish: Arc<Mutex<MenuItem>>,
142 | |         provider: Arc<KwestiasmakuDataProvider<KwestiasmakuClient>>
143 | |     ) -> tokio::task::JoinHandle<()> {
...   |
148 | |         })
149 | |     }
    | |_____^
    = note: ...which again requires processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 188:2>::spawn_update_with_ingredients::{{closure}}#0`, completing the cycle
note: cycle used when processing `data_sources::kwestiasmaku::<impl at src/data_sources/kwestiasmaku.rs:31:1: 188:2>::spawn_update_with_ingredients::{{closure}}#0`
   --> src/data_sources/kwestiasmaku.rs:144:33
    |
144 |           tokio::spawn(async move {
    |  _________________________________^
145 | |             let dish = dish.lock().unwrap();
146 | |             let ingredients = provider.get_ingredients_for_dish(&dish).await.unwrap();
147 | |             dish.update_with_ingredients(ingredients);
148 | |         })
    | |_________^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0391`.
error: could not compile `hungry`.

Can you try to cut away parts to get a minimal example that produces the error? Then we should try to file a bug report to the compiler.

I will try to create something. I can also share full code if you want?

The more irrelevant details you can remove, the easier it would be for us to diagnose the issue.

Here is a simplified example. I have replaced all 'domain' logic and replaced real HTTP calls with simulation but error is the same as in real example. Please, take a look:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=30d2ce6c21a3f2b85b57b925a85a4557

When I click run, I get this error:

error: future cannot be sent between threads safely
   --> src/main.rs:173:17
    |
173 |                 tokio::spawn(async move {
    |                 ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, MenuItem>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:175:40
    |
174 |                     let _dish = _dish.lock().unwrap();
    |                         ----- has type `std::sync::MutexGuard<'_, MenuItem>` which is not `Send`
175 |                     let _ingredients = provider.get_ingredients_for_dish(&_dish).await.unwrap();
    |                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `_dish` maybe used later
176 |                     _dish.update_with_ingredients(_ingredients);
177 |                 })
    |                 - `_dish` is later dropped here

That is not the cycle detected error from before.

Note that the Tokio tutorial has a section about this error.

WHAT?! That's strange... Why while running that locally on my machine I keep getting cycle error whereas here I get error related to Send?

Are you using the latest stable compiler?

Ehhh, no. I was using rustc 1.40.0 (73528e339 2019-12-16), but don't you think such change in error is more than strange?

The Rust compiler considers bad error messages bugs, so I don't find it surprising that they fix errors like that one.

OK, problem fixed. Here is the final solution:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=4560786295eb51e64fc2e681372446ce

Thanks for your help. And btw, is my solution with 'enriched' vector fine? Is it the right way to solve cane in which i pass ownership of 'menu' object, then create new 'enriched' vector based on original one and then return 'menu' ownership with updated enriched 'menu_items'?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.