So, long story short, I need to port the crate tokio_kcp to use async_std, so it's probably gonna be something like async_std_kcp.
My rust programming knowledge is still not very well-versed at the moment, and I come across this piece of code (click for github source):
tokio::select! {
// recv() then input()
// Drives the KCP machine forward
recv_result = udp_socket.recv(&mut input_buffer), if is_client => {
match recv_result {
Err(err) => {
error!("[SESSION] UDP recv failed, error: {}", err);
}
Ok(n) => {
let input_buffer = &input_buffer[..n];
let input_conv = kcp::get_conv(input_buffer);
trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
n, input_conv, ByteStr::new(input_buffer));
let mut socket = session.socket.lock();
// Server may allocate another conv for this client.
if !socket.waiting_conv() && socket.conv() != input_conv {
trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
socket.set_conv(input_conv);
}
match socket.input(input_buffer) {
Ok(true) => {
trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
}
Ok(false) => {}
Err(err) => {
error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}",
n, err, ByteStr::new(input_buffer));
}
}
}
}
}
and I got it into
futures::select! {
// recv() then input()
// Drives the KCP machine forward
recv_result = udp_socket.recv(&mut input_buffer) => {
if is_client {
match recv_result {
Err(err) => {
error!("[SESSION] UDP recv failed, error: {}", err);
}
Ok(n) => {
let input_buffer = &input_buffer[..n];
let input_conv = kcp::get_conv(input_buffer);
trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
n, input_conv, ByteStr::new(input_buffer));
let mut socket = session.socket.lock();
// Server may allocate another conv for this client.
if !socket.waiting_conv() && socket.conv() != input_conv {
trace!("[SESSION] UDP input conv: {} replaces session conv: {}", input_conv, socket.conv());
socket.set_conv(input_conv);
}
match socket.input(input_buffer) {
Ok(true) => {
trace!("[SESSION] UDP input {} bytes and waked sender/receiver", n);
}
Ok(false) => {}
Err(err) => {
error!("[SESSION] UDP input {} bytes error: {:?}, input buffer {:?}",
n, err, ByteStr::new(input_buffer));
}
}
}
}
}
}
// bytes received from listener socket
input_opt = input_rx.recv() => {
if let Some(input_buffer) = input_opt {
let mut socket = session.socket.lock();
match socket.input(&input_buffer) {
Ok(waked) => {
// trace!("[SESSION] UDP input {} bytes from channel {:?}",
// input_buffer.len(), ByteStr::new(&input_buffer));
trace!("[SESSION] UDP input {} bytes from channel, waked? {} sender/receiver",
input_buffer.len(), waked);
}
Err(err) => {
error!("[SESSION] UDP input {} bytes from channel failed, error: {:?}, input buffer {:?}",
input_buffer.len(), err, ByteStr::new(&input_buffer));
}
}
}
}
The problem is input_rx
is now of type futures::channel::mpsc::Receiver
instead of tokio::sync::mpsc::Receiver
, and the former does NOT have the function recv
. To read the data, my only choice is to use try_next
. However, it would not work with future::select
, because try_next
is not async.
So the way I see it, I have 3 options.
- Impliment
recv()
forfutures::channel::mpsc::Receiver
from a custom Trait. - Instead of using select, do a loop that constantly check for the pooling status of
async_std::net::UdpSocket::recv
, which honestly I do not know how. - Still use a loop, but impliment some kind of
try_recv
in a custom Trait forasync_std::net::UdpSocket
, which is probably the most difficult option.
Am I missing something, or am I overcomplicated thing a lot and there is an easier solution?
Thanks in advance.