Hello all,
I am trying to implement an Actix API where I will receive the stream response from OpenAI and stream it back to the user.
If the key is not provided, I will send an HTTP response. However, I am a bit confused about how to return two different types in Rust.
Thanks in advance
Code:
use actix_web::{get, web, App, HttpResponse, HttpServer, Responder};
use actix_web_lab::sse;
use futures_util::stream::StreamExt;
use reqwest;
use reqwest_eventsource::{Event, EventSource};
use serde::{Deserialize, Serialize};
use serde_json;
use std::{convert::Infallible, time::Duration};
#[get("/")]
async fn hello() -> impl Responder {
let open_ai_completion_url = "";
let open_ai_key = "";
if open_ai_key == "" {
return HttpResponse::Ok().body("Please set the OPENAI_KEY environment variable");
}
let response = reqwest::Client::new()
.post(open_ai_completion_url)
.header("api-key", open_ai_key)
.json(&serde_json::json!({
"messages": [
{"role": "system", "content": "Capital of USA?"},
],
"max_tokens": 10,
"temperature": 0,
"stream": true,
}));
let mut es = EventSource::new(response).expect("Failed to create EventSource");
let mut event_stream =
futures_util::stream::iter([Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo")))]);
let event_stream = async_stream::stream! {
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => {
println!("Connection Open!");
}
Ok(Event::Message(message)) => {
let chunk = message.data;
if chunk == "[DONE]" {
println!("Done!");
es.close();
break;
}
let openai_chunk = chunk.split(" ").collect::<Vec<&str>>()[0];
let parsed: Result<OpenAIData, serde_json::Error> = serde_json::from_str(&chunk);
match parsed {
Ok(data) => {
let event = sse::Event::Data(sse::Data::new_json(data).unwrap());
yield Ok::<_, Infallible>(event);
}
Err(err) => {
println!("Error: {:?} - {:?}", err, chunk);
}
}
}
Err(err) => {
println!("Error: {}", err);
es.close();
}
}
}
};
sse::Sse::from_stream(event_stream).with_keep_alive(Duration::from_secs(5))
}
Error:
error[E0308]: mismatched types
--> src/main.rs:67:5
|
34 | let event_stream = async_stream::stream! {
| ________________________-
35 | | while let Some(event) = es.next().await {
36 | | match event {
37 | | Ok(Event::Open) => {
... |
65 | | }
66 | | };
| |_____- the found `async` block
67 | sse::Sse::from_stream(event_stream).with_keep_alive(Duration::from_secs(5))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `HttpResponse`, found struct `Sse`