I want to pass the streaming response to client in streaming

how can i send a streaming response to directly client in streaming way

currently give code is in non-streaming getting complete response and send to the client


#[post("/chat")]
async fn chat(
    req: HttpRequest,
    body: web::Json<ChatData>,
    state: web::Data<AppState>,
) -> impl Responder {
    let db_connection = state.db.lock().unwrap();
  
    let client = reqwest::Client::new();
    let res = client
        .post("https://api.groq.com/openai/v1/chat/completions")
        .bearer_auth(groq_api)
        .header(CONTENT_TYPE, "application/json")
        .json(&json!({
            "messages": [
                {
                    "role": "user",
                    "content": body.message.clone()
                }
            ],
            "model": "mixtral-8x7b-32768",
            "stream":true
        }))
        .send()
        .await
        .unwrap();

    let response: Result<Value, _> = res.json().await;

    match response {
        Ok(json) => {
            let content = json
                .get("choices")
                .and_then(|choices| choices.get(0))
                .and_then(|choice| choice.get("message"))
                .and_then(|message| message.get("content"))
                .and_then(Value::as_str)
                .unwrap_or("No content found")
                .to_string();


            HttpResponse::Ok().json(json!( {"content":content}))

        }
        Err(e) => HttpResponse::InternalServerError().into(),
    }
}

This looks like Rocket.

Check out this issue for examples of how to do this. The up to date ones are at the bottom.

The ones with ReaderStream! generators look especially nice. Just be aware of the feature flags you need to enable in Cargo.toml

You can use HttpResponseBuilder::streaming for streaming responses to your client in Actix-web.

I can stream the response, but sometimes the chunks are incomplete, causing issues with parsing the JSON. This results in missing letters.

I have added code, result and logs.

How can I fix this?

code

 let client = reqwest::Client::new();
    let res = match client
        .post("https://api.groq.com/openai/v1/chat/completions")
        .bearer_auth(groq_api)
        .header(CONTENT_TYPE, "application/json")
        .json(&json!({
            "messages": [
                {
                    "role": "user",
                    "content": body.message.clone()
                }
            ],
            "model": "mixtral-8x7b-32768",
            "stream":true
        }))
        .send()
        .await
    {
        Ok(response) => response,
        Err(e) => {
            eprintln!("Request error: {}", e);
            return HttpResponse::InternalServerError().finish();
        }
    };

if res.status().is_success() {
        let mut stream = res.bytes_stream();
        let response_stream = stream! {
            let mut content_buffer = String::new();
            while let Some(item) = stream.next().await {
                match item {
                    Ok(bytes) => {
                        let s = match std::str::from_utf8(&bytes) {
                            Ok(v) => v,
                            Err(e) => {
                                eprintln!("Invalid UTF-8 sequence: {}", e);
                                continue;
                            }
                        };

                        // Process each line of the streamed data
                        for line in s.split("\n\n") {
                            if let Some(json_str) = line.strip_prefix("data: ") {
                                if json_str == "[DONE]" {
                                    // Handle stream end
                                    break;
                                }
                                 println!("{:#?}",s)
                                // Parse JSON data
                                match serde_json::from_str::<Value>(json_str) {
                                    Ok(json_value) => {
                                        // Extract content from JSON
                                        let extracted_content = json_value
                                            .get("choices")
                                            .and_then(|choices| choices.get(0))
                                            .and_then(|choice| choice.get("delta"))
                                            .and_then(|delta| delta.get("content"))
                                            .and_then(Value::as_str)
                                            .unwrap_or("");

                                        // Append the extracted content to the buffer
                                        content_buffer.push_str(extracted_content);
                                    }
                                    Err(e) => {
                                        eprintln!("Failed to parse JSON: {}", e);
                                    }
                                }
                            }
                        }

                        // Yield the accumulated content
                        yield Ok(Bytes::from(content_buffer.clone()));
                        content_buffer.clear(); // Clear the buffer for the next chunk
                    }
                    Err(e) => {
                        eprintln!("Stream error: {:?}", e);
                        yield Err(ErrorInternalServerError(e))
                    }
                }
            }
        };
        HttpResponse::Ok()
            .content_type("text/plain")
            .streaming(response_stream)
    } else {
        // Return an error response if the request was not successful
        HttpResponse::InternalServerError()
            .body(format!("Request failed with status: {}", res.status()))
    }
}

Response:

! How can I you today? If you have any about Python or programming in general, feel free to ask. I'll do my best to assist you.

Expected Respons:

Hello! How can I you today? If you have any about Python or programming in general, feel free to ask. I'll do my best to assist you.


log

"data: {"id":"chatcmpl-6cfd1a63-5bdd-4f23-b8c1-ac48ee570196","object":"chat.completion.chunk","created":1724281226,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}],"x_groq":{"id":"req_01j5vjq1nyff0r1stf17z5qh63"}}\n\ndata: {"id":"chatcmpl-6cfd1a63-5bdd-4f23-b8c1-ac48ee570196","object":"chat.completion.chunk","created":1724281226,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":""
Failed to parse JSON: EOF while parsing a string at line 1 column 216
"Hello"},"logprobs":null,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-9f878f20-0ea3-484b-b600-ae7e88f66d7f","object":"chat.completion.chunk","created":1724282899,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-9f878f20-0ea3-484b-b600-ae7e88f66d7f","object":"chat.completion.chunk","created":1724282899,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":" How"},"logprobs":null,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-9f878f20-0ea3-484b-b600-ae7e88f66d7f","object":"chat.completion.chunk","created":1724282899,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":" can"},"logprobs":null,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-9f878f20-0ea3-484b-b600-ae7e88f66d7f","object":"chat.completion.chunk","created":1724282899,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":" I"},"logprobs":null,"finish_reason":null}]}\n\ndata: {"id":"chatcmpl-9f878f20-0ea3-484b-b600-ae7e88f66d7f","object":"chat.completion.chunk","created":1724282899,"model":"mixtral-8x7b-32768","system_fingerprint":"fp_c5f20b5bb1","choices":[{"index":0,"delta":{"content":" help"},"logprobs":null,"
"ello"},"logprobs":null,"finish_reason":null}]}

This defeats the purpose of streaming the response. You should stream while you deserialize the response. Take a look at this response in SO for reference.

On the other hand, you are streaming plain text, but the client consuming the stream expects JSON.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.