I am new to Rust (and working with TCP networking applications in general) and am attempting to implement a server using Tokio that spawns two tasks, one that reads incoming bytes, parses them, submits the parsed data to an external API, sends related messages to a mpsc::channel, and writes back a response to the client. The second task receives the messages in the channel and processes them (in order to send requests to another API concurrently).
The main processing task seems to be working, but the second one that receives messages only seems to work with the first message sent by the Sender. Subsequent messages sent over the channel are not read, although I can see that the send() call did not return a SendError.
This is the general structure of the code:
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
let (sender, mut receiver) = mpsc::channel::<ChannelElement>(32);
tokio::spawn(async move { process(socket, sender).await.unwrap() });
tokio::spawn(async move {
process_channel_data(&mut receiver)
.await
.unwrap()
});
}
Ok(())
}
async fn process(stream: TcpStream, sender: Sender<ChannelElement>) -> Result<(), Box<dyn Error>> {
loop {
let ready = socket
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
if ready.is_readable() {
let mut data = vec![0; 1024];
match socket.try_read(&mut data) {
Ok(0) => {
break;
}
Ok(n) => {
data.truncate(n);
println!("received {} bytes", n);
// do some stuff with the data
// to get String processed_data
// some helper function that continues to process the data in some way
// and also sends messages to the channel for the other task to process
if let Ok(resp_str) = helper_function(&processed_data, &sender) {
loop {
if ready.is_writable() {
match socket.try_write(resp_str.as_bytes()) {
Ok(n) => {
println!("wrote {} bytes", n);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
Ok(())
}
async fn helper_function(processed_data: &str, sender: &Sender<ChannelElement>) -> Result<(), Box<dyn Error>> {
// there are multiple send blocks like:
match sender
.send(ChannelElement {
// data
})
.await
{
Ok(_) => println!("sent message to channel"),
Err(e) => println!("{e:?}"),
}
// do some other stuff
Ok(())
}
async fn process_channel_data(receiver: &mut Receiver<ChannelElement>) -> Result<(), Box<dyn Error>> {
loop {
let message = receiver.recv().await;
match message {
// the channel has been closed, due to the sender being dropped after all sends
None => {
break;
}
Some(message) => {
println!("received message: {message:?}");
// do something with the message
}
}
}
Ok(())
}
I'm sure this implementation is not idiomatic and there are definite improvements to be made, but I want to fully understand why this initial naive solution doesn't work as I expect.