Why the client of socket can not receive message immediately?

I wrote a server and a client using tokio., I found that when the client sends a message to the server, the server replies to the message but the client doesn't receive it immediately. The client can only receive the message after the server thread finishes, for example, when the server process is forcibly interrupted. Does anyone know why this is happening?

The Server Code :

 
use std::process::Command;
use std::thread;
use serde_json::{json,Value};

use std::net::SocketAddr;
use tokio::sync::broadcast::{Sender};

use tokio::net::{TcpSocket, TcpStream};
use tokio::io::{Error, AsyncReadExt, AsyncWriteExt};
 

async fn process(tcp_stream:&mut TcpStream,tx: &Sender<(String,SocketAddr)>)-> bool{
    let (mut read_half,_write_half) = tcp_stream.split();
    
    let mut buf = vec![0;1024];
    
    match read_half.read_buf(&mut buf).await {
        
        Ok(_n) => {
            let res = String::from_utf8(buf).unwrap();
            //println!("size:{},content:{}",n,res);
            let peer_addr = tcp_stream.peer_addr().unwrap();
            tx.send((res,peer_addr)).unwrap();
            return true;
        }
        Err(err) => {   
            println!("err : {:?}",err);
            return false;
        }
    }
}
 
 

#[tokio::main]
async fn main() -> Result<(),Error> {

    let addr = "127.0.0.1:5555".parse().unwrap();

    let socket = TcpSocket::new_v4().unwrap();
    let _ = socket.bind(addr);
    let listen = socket.listen(1024).unwrap();

    println!("Server start to listenning port:  5555");

    let (tx,_rx) = tokio::sync::broadcast::channel(1024);

    loop {
        let (mut tcp_stream,_) = listen.accept().await.unwrap();

        let tx = tx.clone();
        let mut rx = tx.subscribe();
        tokio::spawn(async move{
            loop {
                tokio::select! {
                    result = process(&mut tcp_stream,&tx) => {
                        if !result {
                            break;
                        }
                    }
                    result = rx.recv() => {
                        let (mut msg,addr) = result.unwrap();
                        //println!("receive msg {:?} from cliet ", msg);
                        let jsonstr = msg.replace("\0",""); 
                        //println!("receive msg {:?} from cliet ", jsonstr);
                        //let json_data: Value = serde_json::from_str(&jsonstr).expect("Failed parse json"); 
                        if jsonstr.is_empty() {
                            break; 
                        } else {
                            match serde_json::from_str::<Value>(&jsonstr) {
                                Ok(json_data) => {                                   
                                    let command =  &json_data["command"].as_str().expect("command field not found").to_string();
                                    println!("receive command : {:?}   ", command);
                                    let output = Command::new("bash")   
                                                 .arg("-c")
                                                 .arg(command)
                                                 .output()
                                                 .expect("Failed to execute command ");
                                    if output.status.success() {
                                        let result = String::from_utf8_lossy(&output.stdout).to_string();
                                        println!(" {}", result);
                                        let response = get_json_msg(&result);
                                        
                                        if addr == tcp_stream.peer_addr().unwrap(){
                                            println!("send to back client : {:?}   ", response);
                                            let _ = tcp_stream.write_all(response.as_bytes()).await; 
                                            let _ =tcp_stream.flush();
                                           // break;
                                        }
                                    } 
                                }
                                Err(error) => {
                                    println!("Failed parse json {:?}   ", jsonstr);
                                }
                            }
                        }
                    }
                }
            }
            let ip = tcp_stream.peer_addr().unwrap();
            //println!("{:?}:disconnected ",ip);
        });
        
    }  
 
}
fn get_json_msg(value:&str) ->String {
    let json_data = json!({
        "value":value,
    });
    json_data.to_string()
 }

The client Code:

use tokio::io::{AsyncReadExt,BufReader, AsyncBufReadExt};
use tokio::{net::TcpSocket, io::AsyncWriteExt,net::TcpStream};
use tokio::sync::broadcast::{self};
//use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::Result;

use serde_json::{json,Value};

use serde_json::Result as JsonResult;
use std::env;

 

async fn send_command(
    command: &str,
    tx: &broadcast::Sender<String>,
) -> Result<String> {
    let msg = get_json_msg(&command).trim().to_string();
    let socket = TcpSocket::new_v4().unwrap();
    let addr = "127.0.0.1:5555".parse().unwrap();
    let mut tcp_stream = socket.connect(addr).await.unwrap();
    println!("connected :{:?}", addr);
    let _ = tcp_stream.write_all(msg.as_bytes()).await?;
    let _ = tcp_stream.flush();

    let (read_half, _write_half) = tcp_stream.split();
    let mut reader = BufReader::new(read_half);
    let mut content = String::new();

    let mut buf = String::new();
    while reader.read_line(&mut buf).await? > 0 {
        content.push_str(buf.as_str());
        buf.clear();
        if let Ok(json_data) = serde_json::from_str::<Value>(&content) {
            if let Some(response) = json_data["value"].as_str() {
                println!("Received message from server:\n{}", response);
                tx.send(response.to_string()).unwrap();
                break; 
            }
        }
    }

    Ok(content.replace("\0", ""))
}

fn get_json_msg(command:&str) ->String {
    let json_data = json!({
        "command":command,
    });
    json_data.to_string()
 }


#[tokio::main]
async fn main() -> Result<()> {
   let (mut tx, mut rx) = broadcast::channel(1024);
   
   tokio::spawn(async move {
    let response  = send_command("ps -auc",&tx).await;
   
   });

   while let Ok(msg) = rx.recv().await {
    println!("Received message: {}", msg);
}
  // println!(" response: {:?}",response);
  Ok(())
} 

Here in the server you are sending only JSON.

let response = get_json_msg(&result);
                                        
if addr == tcp_stream.peer_addr().unwrap(){
    println!("send to back client : {:?}   ", response);
    let _ = tcp_stream.write_all(response.as_bytes()).await; 
    let _ =tcp_stream.flush();
   // break;
}

Here in the client you are asking to read a line.

    while reader.read_line(&mut buf).await? > 0 {

A line is ended by either a newline or EOF. JSON may or may not contain newlines, but in this case it doesn't, so the client waits until EOF.

Your problem can be understood as inconsistent framing — a definition of where one message ends and the next starts. If you only want to send one thing per connection, as your program currently does, then just using read_to_end() instead will do.[1] If you want to send more than one JSON message, you need to mark where one ends and the next begins — for example, by actually sending a newline after each message (this is fairly popular and sometimes called “NDJSON”).

(It's technically possible to identify the end of a piece of JSON directly by reading until all brackets and quotes are balanced, but that's more trouble to set up and can't reliably detect/recover framing errors.)


  1. But both of these let the server DoS the client by sending arbitrarily long data. ↩︎

2 Likes

Thanks for your reply!! I have already changed the reading method from read_line() to read_to_end() , but the issue still exists. My code:

  let (read_half, _write_half) = tcp_stream.split();
    let mut reader = BufReader::new(read_half);
    //let mut content = String::new();

    let mut buf = String::new();

    let mut buffer = Vec::new();
    reader.read_to_end(&mut buffer).await?;
    let content = String::from_utf8_lossy(&buffer).to_string();
    if let Ok(json_data) = serde_json::from_str::<Value>(&content) {
        if let Some(response) = json_data["value"].as_str() {
            println!("Received message from server:\n{}", response);
            tx.send(response.to_string()).unwrap();
        }
    }

Sorry, I missed a step. If you take this approach, then the server should close the connection right after sending the data and flushing. Then read_to_end gets the EOF it is expecting.

That's where my confusion lies. I have to close the server-side connection in order to receive messages, but if I write the client code like below, where the client and server exchange messages, I don't need to close the connection and can still receive messages immediately.

 
use std::io::{Read, Write};
//use std::net::{TcpListener, TcpStream};
use std::thread;
use serde_json::{json,Value,Result};
use std::env;
use std::os::unix::io::{AsRawFd, FromRawFd};
use libc::{fcntl, F_SETFL, O_NONBLOCK};

use tokio::io::AsyncReadExt;
use tokio::{net::TcpSocket, io::AsyncWriteExt,net::TcpStream};
use tokio::sync::broadcast::{self};

fn get_json_msg(command:&str) ->String {
    let json_data = json!({
        "command":command,
    });
    json_data.to_string()
 }

 

 #[tokio::main]
 async fn main() {

   /*let args: Vec<String> = env::args().collect();
   if args.len() == 2 {
      let command = args.get(1).unwrap().to_string();
      get_value_from_socket(&command);
   }*/
    let socket = TcpSocket::new_v4().unwrap();
    let addr = "127.0.0.1:5555".parse().unwrap();
    let mut tcp_stream = socket.connect(addr).await.unwrap();

    println!("Connected Success:{:?}",addr);
    
    let ( tx,mut rx) = broadcast::channel(1024);
   

    tx.send(String::new()).unwrap();

    tokio::spawn(async move {
        loop {
            let (mut read_half,mut write_half) = tcp_stream.split();
          
            let mut buf = vec![0;1024];
            tokio::select! {
                result = read_half.read_buf(&mut buf) => {
                    match result {
                        Ok(num) => {
                            if num != 1024 {
                                let mut content = String::from_utf8(buf).unwrap();
                               // print!("size = {},receive = {:?}",num,content);
                                if num == 0 { 
                                    std::process::exit(0);
                                }
                                let msg = content.replace("\0","");
                                //print!("received msg : {:?}",msg);
                                if msg.is_empty() {
                                   //empty msg do nothing
                                } else {
                                    //let json_data: Value = serde_json::from_str(&msg).expect("Failed to parse json");
                                    match serde_json::from_str::<Value>(&msg) { 
                                        Ok(json_data) => {
                                            let response =  &json_data["value"].as_str().expect("response field not found").to_string();
                                            println!("receive msg from server:\n {}", response); 
                                            //tx.send(response).unwrap(); 
                                            //std::process::exit(0);
                                        }  
                                        Err(error) => {
                                            println!("Failed parse json {:?} ", msg);
                                        }
                                    }

                                }
                            }
                        }
                        Err(err) => {
                            println!("disconnected!err={:?}",err);

                            std::process::exit(0);
                        }
                    }
                    
                }

                result = rx.recv() => {
                    let send = result.unwrap();
                    let _ = write_half.write_all(send.as_bytes()).await;
                    let _ = write_half.flush();
                }
            }
        }
    });

    loop {
        let mut send = String::new();       
        std::io::stdin().read_line(&mut send).unwrap();
        let input = send.replace("\n", "");
        println!("input {:?}", input);
        if input.is_empty() {
           continue;
        } 
        let msg = get_json_msg(&input).trim().to_string();
        println!("send msg to server {:?}", msg);
        tx.send(msg).unwrap();
    } 

}
    

However, this doesn't quite fit my needs. I only want to write a one-time client that sends a command, reads the message returned by the server, and then closes the client. However, the server needs to keep running in the background, listening for client connections.

In this version, your code is assuming that it can read data until it has data that parses as a complete JSON value. This assumption is true in practice because the server won't be sending multiple values, but if it did, your code would potentially get stuck with multiple messages in the same buffer (which will never successfully parse).
And, it's very inefficient because it tries to parse every time it gets some more data, and the parse will read all the existing data again.

Then, as I said before, the server should close the connection to the client. That is not the same as the server stopping listening for new connections.

1 Like

In addition to what kpreid said: You can close just the write end of a socket to signal you've sent the entire payload and still wait for the response on the read end.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.