When I'm using tokio and tokio_tungstenite, I want to send some data messages in a callback function, but the program reports an error ’there is no reactor running, must be called from the context of a Tokio 1.x runtime note: run with `RUST_BACKTRACE=1` ‘

static GLOBAL_WS_SENDER: Lazy<
    Arc<Mutex<Option<futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>>>>,
> = Lazy::new(|| Arc::new(Mutex::new(None)));
async fn main() {
    let addr = "127.0.0.1:9000";
    let listener = TcpListener::bind(addr).await.expect("Failed to bind");
    println!("WebSocket server running on: ws://{}", addr);

    while let Ok((stream, _)) = listener.accept().await {
        tokio::spawn(handle_connection(stream));
    }
}
async fn handle_connection(stream: tokio::net::TcpStream) {
    let ws_stream = match accept_async(stream).await {
        Ok(ws) => ws,
        Err(e) => {
            eprintln!("Error during WebSocket handshake: {}", e);
            return;
        }
    };

    println!("WebSocket connection established");
    let (write, read) = ws_stream.split();
    let sender = GLOBAL_WS_SENDER.clone();
    let mut lock = sender.lock().await;
    *lock = Some(write);
}
async fn send_global_message(message: String) {
    let sender = GLOBAL_WS_SENDER.clone();
    let mut lock = sender.lock().await;
    if let Some(ref mut write) = *lock {
        if let Err(e) = write.send(Message::Text(message)).await {
            eprintln!("Error sending global message: {}", e);
        }
    } else {
        eprintln!("WebSocket sender not available.");
    }
}
extern "C" fn my_cbf_stm(_l_data: *mut c_void, _l_paramm: *mut c_void) -> c_int {
    let frame_header = unsafe { &*(_l_data as *const Frame) };
    let data_size = (frame_header.width as usize) * (frame_header.height as usize);
    let data_ptr = unsafe { (_l_data.add(std::mem::size_of::<Frame>()) as *const u8) };
    let index_data = unsafe { slice::from_raw_parts(data_ptr, data_size) };
    let mut img: RgbImage = ImageBuffer::new(frame_header.width as u32, frame_header.height as u32);
    for (i, &index) in index_data.iter().enumerate() {
        let x = (i % frame_header.width as usize) as u32;
        let y = (i / frame_header.width as usize) as u32;
        let color = COLOR_PALETTE[index as usize];
        img.put_pixel(x, y, image::Rgb([color.0, color.1, color.2]));
    }
    let base64_data = encode_image_to_base64(img);
    // println!("Base64 encoded image data: {}", base64_data);
    tokio::spawn(async move {
        send_global_message(base64_data.to_string()).await;
    });
    0
}
tokio::spawn(async move {
            send_global_message(base64_data.to_string()).await;
        });

The callback function is automatically executed, and the code reports
an error when it is executed there is no reactor running,
must be called from the context of a Tokio 1.x runtime

Firstly, please read the pinned post on code formatting and put your code in a code block so that it is more readable.

Secondly, it looks like your main function isn't annotated with #[tokio::main], so it isn't running under any runtime (and tokio functions will fail if they aren't run under a tokio runtime). Adding that annotation to your main function should fix the problem.

Thank you very much for your answer. It is the first time for me to ask a question. Thanks for your advice, I will modify my code format now. I added #[tokio::main] to fn main(), but there was still this error.

Thanks for formatting the code. With the formatting applied, I can see that there is an extern "C" function that calls tokio::spawn(), and that seems to be where the error is occurring. How is that function used? Is there C code calling it? If that function gets called without a tokio runtime being created first then that would cause the error.

1 Like

The business I want to realize is to write a service that connects to the camera, and pass the things captured by the camera to the web side. The sdk of the camera is in c language, so I linked the dll of the camera. My current code sequence is to introduce an external c function, listen to the WebSocket service in the main function, wait for the successful establishment in the handle_connection function, and execute the C function to connect the camera. After the connection is successful, the my_cbf_stm function will be automatically executed. my_cbf_stm contains the data frames captured by the camera. Errors are reported when calling tokio::spawn and send_global_message internally

Ah, okay, so my_cbf_stm is a C callback, but I can't see where it is registered, so I'm guessing that the top-level control flow may be outside the code shown here. Basically something needs to construct a tokio runtime and ensure that it is entered before any tokio functions are called. The usual pattern for this is to have an async main function that is called by constructing a runtime and then running it in that runtime using block_on() (#[tokio::main] does all this for you), and then doing everything else from within that main function.

If the main function here isn't handling the control flow, then you will need to ensure the runtime is available for spawning on. That may involve manually constructing a Runtime, storing it somewhere, and then spawning onto that in the functions that aren't already running under it (in the C callback for example). I'm not particularly familiar with doing this sort of thing though.

While this would work, it would be unnecessarily inconvenient. You don't need to manipulate the Runtime; you only need a Handle, which can be gotten from Handle::current() anywhere the runtime is already registered (or from runtime.handle() explicitly, if you prefer).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.