When will tokio::TcpStream get ready for reading?

At first, I used nc -l 8899 to start a TCP server.
And, when I use telnet 127.0.0.1 8899 to connect to the server. my telnet client would receive msg immediately after I type anything in the 'server terminal'.

like this

ape@apedeMac-mini database % nc -l 8899
hello there

ape@apedeMac-mini database % telnet 127.0.0.1 8899
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello there

the problem is. when I use 'tokio::net::TcpStream.connect' to get a TcpStream and check if it is ready to read. I always get the result "false" even though I have typed many words in the server terminal.

let ready = tcp
            .ready(Interest::READABLE | Interest::WRITABLE)
            .await
            .expect("ready");

        println!(
            "ready for reading:{}, ready for writing:{}",
            ready.is_readable(),
            ready.is_writable()
        );

this problem exists until I write something by "TcpStream. write ()". I want to know why. Thanks to friends in the community

Did you press enter in the telnet terminal? It does not send any data until you do.

1 Like

yes. I did press enter

Have you polled the future yet? For example by spawning it on the runtime or awaiting it?

Since futures in Rust are lazy, nothing will happen until it’s polled at least once.

this is all the code I have.

#[macro_use]
extern crate napi_derive;
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::{bindgen_prelude::Result, Error, JsFunction};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{io, thread};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc::{channel, Receiver, Sender};

#[napi(object)]
pub struct CmdMessage {
    pub cmd: u32,
    pub body_str: String,
}

#[derive(Clone)]
struct StreamHandle {
    sender: Sender<CmdMessage>,
}

lazy_static::lazy_static! {
    static ref STREAM_HANDLE: Arc<Mutex<Option<StreamHandle>>> = Arc::new(Mutex::new(None));
}

/// create tcpclient and listen
#[napi]
pub fn create_tcp_stream(callback: JsFunction) -> Result<()> {
    let tsfn: ThreadsafeFunction<CmdMessage, ErrorStrategy::CalleeHandled> = callback
        .create_threadsafe_function(
            0, // Initial thread count
            |ctx: napi::threadsafe_function::ThreadSafeCallContext<CmdMessage>| {
                let cmd = ctx.value;

                Ok(vec![cmd])
            },
        )?;

    tokio::spawn(prepare_tcp_stream(tsfn));
    Ok(())
}

#[napi]
pub fn send_msg(cmd: u32) -> u32 {
    let guard = STREAM_HANDLE.lock().expect("get locker fail");

    if let Some(tcp) = guard.clone() {
        println!("get tcp");
        tokio::spawn(async move {
            let resp = tcp
                .sender
                .send(CmdMessage {
                    cmd,
                    body_str: String::from("whatever"),
                })
                .await;

            if let Err(e) = resp {
                clear_tcp();
                println!("sender fail {}", e);
            }
        });

        1
    } else {
        println!("no tcp");
        0
    }
}

async fn prepare_tcp_stream(ts: ThreadsafeFunction<CmdMessage>) {
    let mut guard = STREAM_HANDLE.lock().expect("get locker fail");

    let (sender, recv) = channel::<CmdMessage>(8);
    let handle = StreamHandle { sender };
    *guard = Some(handle);

    tokio::spawn(start_tcp_loop(recv, ts));
}

async fn start_tcp_loop(mut rx: Receiver<CmdMessage>, ts: ThreadsafeFunction<CmdMessage>) {
    let tcp: std::prelude::v1::Result<TcpStream, std::io::Error> =
        TcpStream::connect("127.0.0.1:8899").await;
    let mut tcp = match tcp {
        Ok(stream) => stream,
        Err(_e) => {
            ts.call(
                Err(Error::from_reason("tcp connnect failed")),
                ThreadsafeFunctionCallMode::Blocking,
            );
            return;
        }
    };

    let mut buf = [0; 1024];
    loop {
        let mut slee_this_tick = true;
        let resp = tcp.try_read(&mut buf);
        if let Ok(size) = resp {
            slee_this_tick = false;
            if size == 0 {
                ts.call(
                    Err(Error::from_reason("tcp connnect closed")),
                    ThreadsafeFunctionCallMode::Blocking,
                );
                clear_tcp();
                break;
            } else {
                // process_buf(&mut buf[..size], &mut status);
            }
            let msg = String::from("get msg");

            let cmd = CmdMessage {
                cmd: 1,
                body_str: msg,
            };
            ts.call(Ok(cmd), ThreadsafeFunctionCallMode::Blocking);
        } else if let Err(e) = resp {
            if e.kind() != io::ErrorKind::WouldBlock {
                eprintln!("connect failed, error:{}", e);
                ts.call(Err(e.into()), ThreadsafeFunctionCallMode::Blocking);
            }
        }

        let handle_msg = rx.try_recv();
        if let Ok(msg) = handle_msg {
            slee_this_tick = false;
            println!("get msg cmd {}", msg.cmd);
            let req_buf = "hello".as_bytes();
            tcp.write(req_buf).await.expect("wriete ok");
        }

        if slee_this_tick {
            thread::sleep(Duration::from_millis(1000));
        }
    }
}
fn clear_tcp() {
    let mut guard = STREAM_HANDLE.lock().expect("get locker fail");
    *guard = None;
}

after npm run build:debug by npi-rs;
the side of js code

const wdw = require('./index')
wdw.createTcpStream(console.log)
// at here, I would type something in the NC terminal and press Enter. the client-terminal wouldn't get anything from NC server.
wdw.sendMsg(1)
// after sending msg,  the client-terminal starts to print `get msg cmd 1`
// null { cmd: 1, bodyStr: 'get msg' }

Thanks to friends in the community

Thanks replay. My code is above

Okay, so this is a little bit more complex. I think it would be wise to try to isolate the behavior somewhat to try to nail down if the problem is with how Tokio is used or with napi. But I can try to give a few pointers based on what I see, and you can see if it helps to point you in the right direction.

The most obvious thing here is that nothing is running the Tokio runtime (at least not from what I see). Napi is supposed to ship with an optional Tokio runtime, but I would think that you need to pass it an async function for that to work as shown in the documentation.

That means you should probably have:

#[napi]
pub async fn create_tcp_stream(callback: JsFunction) -> Result<()> {

...and expect a promise in return.

That would also remove the need for spawning all the futures below, and run them as one single task by awaiting them instead. I don't know exactly how async is handled in napi, but I would guess that each call to wdw.createTcpStream in JS api is treated as one top-level task and you work with them as you do with other promises in JS.

Next, in the function start_tcp_loop you're essentially busy-looping, polling the TcpStream for readiness all the time. If you look at the documentation for TcpStream::ready you'll see an example on how you're supposed to work with the Tokio runtime to avoid busy looping. If you call TcpStream::ready(...).await it will only poll if there is a read event on the event queue.

You will need to rewrite your function to make this work and probably select! on the futures returned by TcpStream::ready and rc.recv and handle each case that way.

The last and probably most important thing is the way you handle the results from trying to read from the TcpStream.

let resp = tcp.try_read(&mut buf);
        if let Ok(size) = resp {
            slee_this_tick = false;
            if size == 0 {
                ts.call(
                    Err(Error::from_reason("tcp connnect closed")),
                    ThreadsafeFunctionCallMode::Blocking,
                );
                clear_tcp();
                break;
            } else {
                // process_buf(&mut buf[..size], &mut status);
            }
            let msg = String::from("get msg");

            let cmd = CmdMessage {
                cmd: 1,
                body_str: msg,
            };
            ts.call(Ok(cmd), ThreadsafeFunctionCallMode::Blocking);
        } else if let Err(e) = resp {
            if e.kind() != io::ErrorKind::WouldBlock {
                eprintln!("connect failed, error:{}", e);
                ts.call(Err(e.into()), ThreadsafeFunctionCallMode::Blocking);
            }
        }

Since you don't wait for a read event and go strait to polling the TcpStream, a ErrorKind::WouldBlock is the expected result. Not an error. WouldBlock signals that there is no data to read from the stream yet (or that a read would have to block the thread).

Given that you treat that as an error I'm kind of surprised that your call to wdw.createTcpStream(console.log) doesn't return an error immediately. It might have something to do with the fact that napi doesn't know that it's running an async function and therefore doesn't handle the errors correctly. The first suggestion above should fix that.

Again, see the tokio example on how to handle the result from trying to read from the TcpStream, and don't treat WouldBlock as an error.

You should, however, treat other errors as an error so they're handled correctly (possibly with the exception of Kind::Interrupted, but at minimum what they do in the Tokio docs).

let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;

        if ready.is_readable() {
            let mut data = vec![0; 1024];
            // Try to read data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.try_read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }

        }

There might be more issues here, but since I don't know enough about napi, and I don't have time to copy your whole setup this is the best I could do.

2 Likes

thanks for your reply.
but

#[napi]
  pub async fn create_tcp_stream(callback: JsFunction) -> Result<()> {

will throw an error that the trait Send is not implemented for napi_env. because we must make sure we only access the JS object in the main thread. js code is running in the main thread.

and after I separate the napi and tokio. the TcpStream is reading well from the NC server. so I guess wrapped by napi the ready event is a little bit strange. and TcpStream::write method turn the ready event back on track

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.