Okay, so this is a little bit more complex. I think it would be wise to try to isolate the behavior somewhat to try to nail down if the problem is with how Tokio is used or with napi. But I can try to give a few pointers based on what I see, and you can see if it helps to point you in the right direction.
The most obvious thing here is that nothing is running the Tokio runtime (at least not from what I see). Napi is supposed to ship with an optional Tokio runtime, but I would think that you need to pass it an async function for that to work as shown in the documentation.
That means you should probably have:
#[napi]
pub async fn create_tcp_stream(callback: JsFunction) -> Result<()> {
...and expect a promise in return.
That would also remove the need for spawning all the futures below, and run them as one single task by awaiting them instead. I don't know exactly how async is handled in napi, but I would guess that each call to wdw.createTcpStream
in JS api is treated as one top-level task and you work with them as you do with other promises in JS.
Next, in the function start_tcp_loop
you're essentially busy-looping, polling the TcpStream for readiness all the time. If you look at the documentation for TcpStream::ready you'll see an example on how you're supposed to work with the Tokio runtime to avoid busy looping. If you call TcpStream::ready(...).await
it will only poll if there is a read event on the event queue.
You will need to rewrite your function to make this work and probably select! on the futures returned by TcpStream::ready
and rc.recv
and handle each case that way.
The last and probably most important thing is the way you handle the results from trying to read from the TcpStream.
let resp = tcp.try_read(&mut buf);
if let Ok(size) = resp {
slee_this_tick = false;
if size == 0 {
ts.call(
Err(Error::from_reason("tcp connnect closed")),
ThreadsafeFunctionCallMode::Blocking,
);
clear_tcp();
break;
} else {
// process_buf(&mut buf[..size], &mut status);
}
let msg = String::from("get msg");
let cmd = CmdMessage {
cmd: 1,
body_str: msg,
};
ts.call(Ok(cmd), ThreadsafeFunctionCallMode::Blocking);
} else if let Err(e) = resp {
if e.kind() != io::ErrorKind::WouldBlock {
eprintln!("connect failed, error:{}", e);
ts.call(Err(e.into()), ThreadsafeFunctionCallMode::Blocking);
}
}
Since you don't wait for a read event and go strait to polling the TcpStream, a ErrorKind::WouldBlock
is the expected result. Not an error. WouldBlock signals that there is no data to read from the stream yet
(or that a read would have to block the thread).
Given that you treat that as an error I'm kind of surprised that your call to wdw.createTcpStream(console.log)
doesn't return an error immediately. It might have something to do with the fact that napi doesn't know that it's running an async function and therefore doesn't handle the errors correctly. The first suggestion above should fix that.
Again, see the tokio example on how to handle the result from trying to read from the TcpStream, and don't treat WouldBlock as an error.
You should, however, treat other errors as an error so they're handled correctly (possibly with the exception of Kind::Interrupted
, but at minimum what they do in the Tokio docs).
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
if ready.is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
There might be more issues here, but since I don't know enough about napi
, and I don't have time to copy your whole setup this is the best I could do.