Why can't await be executed here

src/lib.rs

#![warn(unused_must_use)]
#![warn(non_snake_case)]
#![feature(async_closure)]
#[macro_use]
extern crate lazy_static;

use std::sync::{Arc, Mutex};
use std::io::{Error, ErrorKind, Bytes};
use futures_util::stream::{StreamExt, SplitSink};
use futures_util::{SinkExt, TryFutureExt};
use futures_util::sink::Sink;
use tokio::runtime::Builder;
use async_tungstenite::tokio::{connect_async, ConnectStream};
use async_tungstenite::WebSocketStream;
use async_tungstenite::tungstenite::protocol::Message;
use async_tungstenite::tungstenite::handshake::client::Request;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI32, Ordering};
use std::option::Option::Some;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
use std::borrow::{BorrowMut, Borrow, Cow};
use std::boxed::Box;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::cell::RefCell;


pub enum WebSocketNotifyMessage{
    Text(String),
    Binary(Bytes<u8>),
    Close
}


#[repr(C)]
pub enum WebSocketMessageType {
    Text = 1,
    Binary = 2,
    Ping = 3,
    Pong = 4,
    Close = 5
}

#[repr(C)]
pub struct KVPair {
    key: &'static str,
    value: &'static str
}


type WebSocketHandle = SplitSink<WebSocketStream<ConnectStream>, Message>;
type WebSocketOnConnection = extern "stdcall" fn(i32);
type WebSocketOnReceive = extern "stdcall" fn(i32, WebSocketMessageType, &[u8], usize);
type WebSocketOnConnectFail = extern "stdcall" fn(i32);
type WebSocketOnClose= extern "stdcall" fn(i32);

struct WebsocketNotify {
    tx: Arc<Mutex<UnboundedSender<WebSocketNotifyMessage>>>,
    rx: Arc<Mutex<UnboundedReceiver<WebSocketNotifyMessage>>>
}


lazy_static!(
    static ref MANAGER: Arc<Mutex<HashMap<i32, Option<WebsocketNotify>>>> = {
        let map = HashMap::new();
        Arc::new(Mutex::new(map))
    };
);

static ID: AtomicI32 = AtomicI32::new(0);


fn inner_generate_conn_id() -> i32 {
    let mut conn_id = 0;
    loop {
        conn_id = ID.fetch_add(1, Ordering::Relaxed);
        if conn_id == 0 {
            continue;
        }
        else {
            MANAGER.lock().unwrap().entry(conn_id).or_insert(None);
        }
    }
    conn_id
}

fn inner_delete_conn_id(conn_id: i32) {
    let mut manager = MANAGER.lock().unwrap();
    if manager.contains_key(&conn_id) {
        manager.remove(&conn_id);
    }
}

async fn inner_ws_connect(conn_id: i32, url: &'static str, headers: &[KVPair], extensions: &'static [&str], on_connect: WebSocketOnConnection,
                          on_receive: WebSocketOnReceive)
                          -> Result<bool, Error> {

    let mut requestBuilder = Request::get(url);
    for val in headers {
        requestBuilder = requestBuilder.header(val.key, val.value);
    }
    for val in extensions {
        requestBuilder = requestBuilder.extension(val);
    }
    let request = requestBuilder.body(()).unwrap();

    let connection = connect_async(request)
        .await;
    if let Err(_) = connection {
        return Err(Error::from(ErrorKind::ConnectionRefused));
    }
    let (ws, _) = connection.unwrap();
    let (mut write, mut read) = ws.split();
    let (tx, rx) = unbounded_channel::<WebSocketNotifyMessage>();
    let (tx, rx) = (
        Arc::new(Mutex::new(tx)),
        Arc::new(Mutex::new(rx))
    );
    MANAGER.lock().unwrap().entry(conn_id).and_modify(|x| {
        *x = Some(WebsocketNotify {
           tx, rx
        });
    });

    tokio::spawn(async move {
        let mut rx;
        {
            let manager = MANAGER.lock().unwrap();
            let mut kv = manager.get_key_value(&conn_id).unwrap();
            let mut notify = kv.1.as_ref().unwrap();
            rx = notify.rx.clone();
        };
        let mut rx = rx.lock().unwrap();
        let mut rx = rx.borrow_mut();
        let a = rx.recv();
        a.await;
    });

    tokio::spawn(async move {
        write.send(Message::Text("123".to_string())).await;
    });

    tokio::spawn(async move {
        on_connect(conn_id);
    });

    while let Ok(msg) = read.next().await.unwrap() {
        match msg {
            Message::Text(text) => {
                tokio::spawn(async move {
                    let msg = text.as_bytes();
                    let msg_size = msg.len();
                    on_receive(conn_id, WebSocketMessageType::Text, msg, msg_size);
                });
            },
            Message::Binary(binary) => {
                tokio::spawn(async move {
                    let msg = binary.as_slice();
                    let msg_size = msg.len();
                    on_receive(conn_id, WebSocketMessageType::Binary, msg, msg_size);
                });
            },Message::Ping(_) => {

                // write.send(Message::Pong(vec![])).await;
            }
            _ => {}
        }
    }
    Ok(true)
}

#[no_mangle]
pub extern "stdcall" fn scheduler_run(worker_threads: usize, thread_stack_size: usize, on_run: extern "stdcall" fn()) {
    let rt = Builder::new_multi_thread()
        .worker_threads(worker_threads)
        .thread_stack_size(thread_stack_size)
        .build()
        .unwrap();

    rt.block_on(async {
        on_run();
    });
}

#[no_mangle]
pub extern "stdcall" fn scheduler_spawn(task_func: extern "stdcall" fn(i32), arg: i32) {
    tokio::spawn(async move {
        task_func(arg);
    });
}


#[no_mangle]
pub extern "stdcall" fn ws_connect(url: &'static str, headers: &'static [KVPair], extensions: &'static [&str], on_connect: WebSocketOnConnection,
                                   on_receive: WebSocketOnReceive, on_close: WebSocketOnClose, on_fail: WebSocketOnConnectFail) -> i32 {
    let conn_id = inner_generate_conn_id();
    tokio::spawn(async move {
        let result = inner_ws_connect(conn_id, url, &headers, extensions,on_connect,
                             on_receive).await;
        match result {
            Ok(_) => {
                on_close(conn_id);
            },
            Err(err) => {
                match err.kind() {
                    ErrorKind::ConnectionRefused => {
                        on_fail(conn_id);
                    },
                    ErrorKind::ConnectionAborted => {
                        on_close(conn_id);
                    },
                    _ => {

                    }
                }
            }
        }
        inner_delete_conn_id(conn_id);
    });
    conn_id
}

#[no_mangle]
pub extern "stdcall" fn ws_send_text(conn_id: i32, text: &str) {
    let mut tx;
    {
        let manager = MANAGER.lock().unwrap();
        let mut kv = manager.get_key_value(&conn_id).unwrap();
        let mut notify = kv.1.as_ref().unwrap();
        tx = notify.tx.clone();
    };
    let mut tx = tx.lock().unwrap();
    let mut tx = tx.borrow_mut();
    tx.send(WebSocketNotifyMessage::Text(text.to_string()));
}

#[no_mangle]
pub extern "stdcall" fn ws_send_binrary(conn_id: i32, text: &str) {
    /*
    MANAGER.lock().unwrap().entry(conn_id).and_modify(|x| {
        if let Some(t) = x {
            tokio::task::spawn(async move {
                t.send(Message::Text(text.to_string())).await;
            });
        }
    });

     */
}

Cargo.toml

[package]
name = "rs-ws"
version = "0.1.0"
authors = [""]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "rs_ws"
path = "src/lib.rs"
crate-type = ["cdylib"]


[dependencies]
url = "2.2.1"
http = "0.2.3"
libc = "0.2.88"
regex = "1"
lazy_static = "1.4.0"
futures-util = "0.3.13"
tokio = { version = "1.3.0", features = ["full"] }
async-tungstenite ={ version = "0.13.0", features = ["tokio-native-tls"] }


tokio-native-tls = { version = "^0.3.0", optional = true}

At line 137, just write "a.await" and it will not compile. How can I modify it? Please help me, thank you

This is covered in the Tokio tutorial here. In the future, please make sure to include the error message.

As a word of advice: putting senders and receivers into mutexes is a bad idea. Putting them into refcells is an even worse idea.

Thank you, I understand, just save tx, only tx can move the thread

Just a few comments on the code:

You don't need to import std::option::Option::Some or std::boxed::Box, they're in the prelude and so can just be used directly.

Since 2018 edition you don't need #[macro_use] extern crate lazy_static; - just use lazy_static::lazy_static; works fine, and is a better idea since it isn't a glob import. And you don't need #![feature(async_closure)] - async || {} works almost exactly the same as || async {} currently, but the latter is available on stable and so is preferable.

MPSC stands for "multi-producer single consumer". This means that the sender can be cloned - so instead of having an Arc<Mutex<UnboundedSender>> you can simply store an UnboundedSender which supports cloning equally well. It looks like you need an MPMC (multi-producer multi-consumer) channel here though, for which you can use async-channel or flume.

The contains_key check is redundant, it is totally OK to remove a non-existant entry from a HashMap.

Instead of a Mutex<HashMap>, you might wish to use a concurrent map like dashmap, it would be more suited for this use case. It doesn't look like the Arc around the MANAGER is necessary. It's a global so it can be accessed from anywhere, whereas Arc is just used for sharing a non-global generally.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.