Call FnMut from within async FnMut from for_each

In this situation, what is recommended solution in Rust?

  • Using Arc?
  • Using Channels?
  • Cloning?
  • Boxing?
  • do some lifetime hacks
  • do some esoteric impl stuff
  • Avoid using FnMut completely
  • some other secret esoteric pattern not written in rust books

I encounter many problems to which there seems to be no reading material and only a few individuals know the solution, either because they were into Rust since the very beginning, or they contributed to the creation of the language itself. Beginners are very dependent on these few individuals as we can't decipher the compiler messages. I bought a book which has only a small section on closures and my case is not covered.

Writing Rust seems to be a lot about secret recipes, the standard library provides tools for most situations and where you can't solve it you have to use unsafe code. I get it, but you can't write them all in a book. This can only be passed down through countless stackoverflow questions.

I have an async closure returned from some futures_util::stream::stream::StreamExt for_each and trying to repeatedly call an FnMut provided through the functions arguments from within the async clsoure provided to the for_each.

Now beginners already read a lot online to avoid anything async in Rust. But I have chosen tungstenite_async and am used to writing a lot of async code since many years now, and it's hard to think of any other way at this point. Before promises doing coroutines in Javascript involved heavy use of callbacks (callback hell), and I feel like going that road is going to be even harder in Rust.

I would be glad avoiding closures, FnMut and async completely, but what would be the other way?

use std::collections::HashMap;
use std::error::Error;

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use async_tungstenite::WebSocketStream;
use async_tungstenite::tokio::{connect_async, TokioAdapter};
use async_tungstenite::tungstenite::Message;
use async_tungstenite::tungstenite::protocol::CloseFrame;
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode;

use tokio::pin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;

use futures::stream::ForEach;
use futures::stream::SplitStream;

use futures::StreamExt;
use futures::SinkExt;

use serde::Deserialize;
use serde::de::Deserializer;
use serde_json::Value;

use reqwest;

async fn test<C>(&mut self, callback: C, symbol:&str) -> Result<(), Box<dyn std::error::Error>> where C: FnMut() {

        let Websocket { ref mut connections, .. } = self;

        let (stream, response) = 
        connect_async("wss://somestream")
        .await
        .expect("connect_async_error");

        let (write, read) = stream.split();
        
        let _ = read.for_each(|message| async {

            let _ = match message.unwrap() {
                
                Message::Text(message) => {
                    
                    if let Ok(message) = serde_json::from_str::<SomeType>(&message) {

                        callback(); // ERROR
                    };
                },
                Message::Ping(message) => { println!("{} ping", String::from_utf8(message).unwrap()); },
                Message::Close(x) => { dbg!("closed"); },
                _ => println!("not implemented")
            };

        }).await;

        Ok(())

    }

Error Message

1242 |     async fn test<C>(&mut self, callback: C, symbol:&str) -> Result<(), Box<dyn std::error::Error>> where C: FnMut() {
     |                                 -------- help: consider changing this to be mutable: `mut callback`
...
1275 |                         callback();
     |                         ^^^^^^^^ cannot borrow as mutable

error: captured variable cannot escape `FnMut` closure body
    --> src\main.rs:1265:41
     |
1242 |       async fn test<C>(&mut self, callback: C, symbol:&str) -> Result<(), Box<dyn std::error::Error>> where C: FnMut() {
     |                                   -------- variable defined here
...
1265 |           let _ = read.for_each(|message| async {
     |  _______________________________________-_^
     | |                                       |
     | |                                       inferred to be a `FnMut` closure
1266 | |
1267 | |
1268 | |             let _ = match message.unwrap() {
...    |
1275 | |                         callback();
     | |                         -------- variable captured here
...    |
1293 | |
1294 | |         }).await;
     | |_________^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
     |
     = note: `FnMut` closures only have access to their captured variables while they are executing...
     = note: ...therefore, they cannot allow references to captured variables to escape

I really liked your introduction about the difficulties of Rust and wanted to try to compile this but I don't know what crates and imports you're using -- can you post a bit more code or a link to the repo?

use std::collections::HashMap;
use std::error::Error;

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use async_tungstenite::WebSocketStream;
use async_tungstenite::tokio::{connect_async, TokioAdapter};
use async_tungstenite::tungstenite::Message;
use async_tungstenite::tungstenite::protocol::CloseFrame;
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode;

use tokio::pin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;

use futures::stream::ForEach;
use futures::stream::SplitStream;

use futures::StreamExt;
use futures::SinkExt;

use serde::Deserialize;
use serde::de::Deserializer;
use serde_json::Value;

use reqwest;

" Look, Rust is important and influential. It will have a respected place in industry - not like you have to wait for that, it does now. But it seems to me that it's always going to the province of a small number of highly paid & educated programmers. I'm fine with that. But language that makes Rust sound like the next JavaScript seems pretty silly to me, as it's never going to have a comparable userbase. "

Thanks, could you also copy paste your dependencies section so I can add all the crate features you're using?

[package]
name = "x"
version = "0.2.1"
edition = "2021"

[dependencies]
reqwest = { version = "0.11.24", features = ["json"] }
dotenv = "*"
rand = "*"
time = { version = "*", features = ["macros"] }
serde = { version = "*", features = ["derive"] }
serde_json = "*"
futures = "*"
tokio = { version = "*", features = ["full"] }
async-tungstenite = { version = "0.24.0", features = ["tokio-runtime", "tokio-native-tls"] }

"What's going on here? Why is the language causing so much pain voted as "the most loved language" for like 7 years in a row in the Stack Overflow survey. Is it the Stockholm syndrome? Do Rust devs like pain? Or maybe it's a huge cult: you get in and you try to lure in as many people as you can so they feel the pain too."

Do you have a definition for Websocket I can use?

                let Websocket { ref mut connections, .. } = self;

struct Websocket {

    url: &'static str,
    connections: HashMap<String, Option<()>>,
    max_connection_streams: i16,

}


Tbh its such a prototype, that I didn't bother posting the whole code... lots of things that are not relevant to the actual problem.

Thanks, I'm getting the error you posted now.

Nah, it's much more boring, we just need maximum performance + memory safety and there isn't a better way to get it right now.

yeah i see. Pretty much willing to learn the Rust secrets, but some articles and books just show some problem, pull out some absolutely obscure pattern and say its simple as that. Where do they even get this knowledge from? Seeing how programming in Rust feels, programming in high level languages is an illusion. It shouldn't even be considered programming.

Based on the error

     = note: `FnMut` closures only have access to their captured variables while they are executing...
     = note: ...therefore, they cannot allow references to captured variables to escape

I changed the callback C from FnMut to Fn

                C: Fn(),

and the error went away. Do you need an FnMut?

The documentation says, that if the function will be called multiple times, FnMut should be used.

Instances ofFnMut can be called repeatedly and may mutate state.

Reading so much on the internet, sometimes I don't know if I'm underestimating or overestimating Rusts difficulty. Now for this example your solution was astonishingly simple. I just don't know if FnMut was even needed. Do you know why FnMut didn't work tho?

Do I understand it correctly that FnMut should only be used if I want to mutate the inputs of that function?

Yeah, that is confusing. Because the doc for Fn says:

Instances of Fn can be called repeatedly without mutating state.

The "inheritance" hierarchy is Fn : FnMut : FnOnce. So both Fn and FnMut can be called more than once.

The best doc on this I've seen is the Fn doc.

thanks a lot. Now on to the next compiler error :wink:

FnMut is used when mutating "state", from the FnMut doc:

Use FnMut as a bound when you want to accept a parameter of function-like type and need to call it repeatedly, while allowing it to mutate state. If you don’t want the parameter to mutate state, use Fn as a bound; if you don’t need to call it repeatedly, use FnOnce.

By "state" they mean mutable references to captured variables, which is implied by this in the FnMut doc and probably explained elsewhere as well:

FnMut is implemented automatically by closures which take mutable references to captured variables

You're welcome, best of luck.