How to build a stream server?(Edit: a minimal reproduce)

I'm working on developing a chatGPT CLI tool or TUI tool, so I need a mock server to simulate the API's response. This is my attempt:

use std::{
    io::{Read, Write},
    net::{SocketAddr, TcpListener, TcpStream},
    time::Duration,
};

fn main() {
    let jh = std::thread::spawn(run_server);
    std::thread::sleep(Duration::from_secs(1));

    let mut stream = TcpStream::connect("127.0.0.1:3000").unwrap();
    stream.write_all(b"POST / HTTP/1.1\r\n\r\n").unwrap();

    let mut buf = [0; 1024];
    while let Ok(len) = stream.read(&mut buf) {
        if len == 0 {
            break;
        }
        let resp = &buf[..len];
        println!(
            "Client Receive: {}",
            String::from_utf8_lossy(resp).split("\r\n").last().unwrap()
        );
    }
    jh.join().unwrap();
}

fn run_server() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(addr).unwrap();
    listener.set_nonblocking(true).unwrap();
    println!("Listening on: {}", addr);

    loop {
        match listener.accept() {
            Ok((mut stream, _)) => {
                let mut buf = [0; 1024];
                match stream.read(&mut buf) {
                    Ok(len) => {
                        let buf = String::from_utf8_lossy(&buf[..len]);
                        println!("Server Receive: {}", buf.split("\r\n").last().unwrap());
                        let resp = "This is response from mock server.".split_whitespace();
                        for word in resp {
                            let chunk = gen_resp(word);
                            stream.write_all(chunk.as_bytes()).unwrap();
                            std::thread::sleep(Duration::from_millis(100));
                        }
                    }
                    Err(e) => {
                        println!("Error reading from stream: {}", e);
                    }
                }
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // This is expected; we just continue the loop
                std::thread::sleep(Duration::from_millis(100)); // Slight delay to reduce CPU usage
            }
            Err(e) => {
                println!("Error accepting connection: {}", e);
            }
        }
    }
}

fn gen_resp(msg: &str) -> String {
    format!(
        "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
        msg.len(),
        msg
    )
}

This successfully print

Listening on: 127.0.0.1:3000
Server Receive:
Client Receive: This
Client Receive: is
Client Receive: response
Client Receive: from
Client Receive: mock
Client Receive: server.

However, it's inconvenient to use TcpStream directly in practice, so I use reqwest instead.

use futures_util::StreamExt;
use serde_json::json;
use std::{
    io::{Read, Write},
    net::{SocketAddr, TcpListener},
    time::Duration,
};

#[tokio::main]
async fn main() {
    let jh = std::thread::spawn(run_server);
    std::thread::sleep(Duration::from_secs(1));
    let mut stream = reqwest::Client::new()
        .post("http://127.0.0.1:3000")
        .json(&json!({"msg": "hello"}))
        .send()
        .await
        .unwrap()
        .bytes_stream();
    while let Some(resp) = stream.next().await {
        let resp = resp.unwrap();
        println!(
            "Client Receive: {}",
            String::from_utf8_lossy(&resp).split("\r\n").last().unwrap()
        );
    }
    jh.join().unwrap();
}

fn run_server() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(addr).unwrap();
    listener.set_nonblocking(true).unwrap();
    println!("Listening on: {}", addr);
    loop {
        if let Ok((mut stream, _)) = listener.accept() {
            let mut buf = [0; 1024];
            if let Ok(len) = stream.read(&mut buf) {
                let buf = String::from_utf8_lossy(&buf[..len]);
                println!("Server Receive: {}", buf);
                let resp = "This is response from mock server.".split_whitespace();
                for word in resp {
                    let chunk = gen_resp(word);
                    stream.write_all(chunk.as_bytes()).unwrap();
                    std::thread::sleep(Duration::from_millis(100));
                }
            }
        }
    }
}

fn gen_resp(msg: &str) -> String {
    let msg = format!("data: {}\n\n", msg);
    format!(
        "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
        msg.len(),
        msg
    )
}

Then it cannot work completely. Not only the server read nothing, but also a Broken Pipe error may raise.

The reason is I didn't write http header correctly, and the client just think the communication is over, and shutdown the pipe.

Server:

use std::{
    io::Write,
    net::{SocketAddr, TcpListener, TcpStream},
    time::Duration,
};

use crate::{data::Chunk, error::Result};

pub(crate) struct Mock {}

impl Mock {
    pub(crate) fn new() -> Self {
        Self {}
    }

    pub(crate) fn run(&self, port: u16, close_idle: Duration) -> Result<()> {
        let addr = SocketAddr::from(([127, 0, 0, 1], port));
        let listener = TcpListener::bind(addr)?;
        listener.set_nonblocking(true)?;
        let mut instant = std::time::Instant::now();
        loop {
            if let Ok((stream, _)) = listener.accept() {
                handle_stream(stream);
                instant = std::time::Instant::now();
            } else if instant.elapsed() > close_idle {
                println!("Mock server closed.");
                break;
            }
        }
        Ok(())
    }
}

fn handle_stream(mut stream: TcpStream) {
    stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").unwrap();
    for world in ["Response ", "from ", "mock ", "server."] {
        let chunk = Chunk::new(world);
        let chunk = gen_resp(serde_json::to_string(&chunk).unwrap());
        stream.write_all(chunk.as_bytes()).unwrap();
        stream.flush().unwrap();
        std::thread::sleep(Duration::from_millis(100));
    }
    let chunk = gen_resp("[DONE]".to_string());
    stream.write_all(chunk.as_bytes()).unwrap();
    stream.flush().unwrap();
    stream.shutdown(std::net::Shutdown::Both).unwrap();
}

fn gen_resp(data: String) -> String {
    format!("data: {}\n\n", data)
}

Client:

pub async fn ask(&self, config: Config, output: &mut impl io::Write) -> Result<()> {
    let mut stream = Self::client()
        .post(config.endpoint().clone())
        .header(
            header::AUTHORIZATION,
            format!("Bearer {}", config.api_key()),
        )
        .json(&self)
        .send()
        .await?
        .bytes_stream();
    while let Some(item) = stream.next().await {
        let item = item?;
        let s = std::str::from_utf8(&item).expect("Invalid UTF-8 sequence");
        for p in s.split("\n\n") {
            if let Some(p) = p.strip_prefix("data: ") {
                // Check if the stream is done...
                if p == "[DONE]" {
                    break;
                }
                // Parse the json data...
                let d = serde_json::from_str::<Chunk>(p)?;
                if let Some(c) = d.content() {
                    output.write_all(c.as_bytes())?;
                    output.flush()?;
                }
            }
        }
    }
    output.write_all(b"\n")?;
    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.