Push values received by websocket into vector passed by reference

Hello

I am trying to pas a mutable vector by reference to a function so the function can insert values in that vector. The issue is that the function is a 'run forever' function, it makes a websocket connection which retrieves a value each second. I want to put the values of what the websocket sends into the array.

My current attempt:

    // Call to function
    let mut values : Arc<Mutex<std::vec::Vec<f64>>> = Arc::new(Mutex::new(std::vec::Vec::new()));
    func(&prices).await;

    // Function
    async fn func(&self, values: &Arc<Mutex<std::vec::Vec<f64>>>) {
        let url = url::Url::parse("wss://example.com/ws/getdata").unwrap();


        let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded();
        tokio::spawn(read_stdin(stdin_tx));

        let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
        println!("WebSocket handshake has been successfully completed");

        let (write, read) = ws_stream.split();

        let stdin_to_ws = stdin_rx.map(Ok).forward(write);
        let values_arc = Arc::clone(values);
        let ws_to_stdout = {
            read.for_each(|message| async {
                let values_arc2 = Arc::clone(&values_arc);

                let data = message.unwrap().into_data();
                
                let json = serde_json::from_str::<serde_json::Value>(&*String::from_utf8(data).unwrap()).unwrap();
                let value_str = &json["k"]["c"].as_str().unwrap().replace("\"", "");
                println!("c {}", value_str);
                values_arc2.lock().unwrap().push(value_str.parse::<f64>().unwrap());
            })
        };

        pin_mut!(stdin_to_ws, ws_to_stdout);
        future::select(stdin_to_ws, ws_to_stdout).await;

        async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender<Message>) {
            let mut stdin = tokio::io::stdin();
            loop {
                let mut buf = vec![0; 1024];
                let n = match stdin.read(&mut buf).await {
                    Err(_) | Ok(0) => break,
                    Ok(n) => n,
                };
                buf.truncate(n);
                tx.unbounded_send(Message::binary(buf)).unwrap();
            }
        }
    }


I already had to implement Arc and Mutex because otherwise I got the error:

error: captured variable cannot escape `FnMut` closure body
   --> src/api/file:111:37
    |
97  |       async fn func(&self, values: &mut...
    |                                               ------ variable defined here
...
111 |               read.for_each(|message| async {
    |  ___________________________________-_^
    | |                                   |
    | |                                   inferred to be a `FnMut` closure
112 | |                 let data = message.unwrap().into_data();
113 | |
114 | |                 let json = serde_json::from_str::<serde_...
...   |
118 | |                 prices.push(value);
    | |                 ------ variable captured here
119 | |             })
    | |_____________^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
    |
    = note: `FnMut` closures only have access to their captured variables while they are executing...
    = note: ...therefore, they cannot allow references to captured variables to escape

The code posted above using Arc and Mutex compiles, but the problem is that I'd like to print the content of the vector when a value is updated where the code is called. Maybe I need to work with a separate thread?

It sounds like Vec isn't the right thing to use here.

You probably want something that implements the futures::Sink trait like a futures::channel::mpsc channel so func() can send items while another task processes them concurrently.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.