I am working with Polars Dataframe in Rust and have a need to respond in Http with such data in response body.
Polars supports sending dataframe in IPC format (inter-process communication) for network transfer via Trait std::io::Write, which std::net::TcpStream (on raw socket) works perfectly directly writing to socket w/o having to first save the IPC data into bytes memory.
use polars::prelude::*;
use polars_io::ipc::{IpcWriter};
use std::net::{TcpStream};
fn handle_connection(mut stream: TcpStream, raw_df: &mut DataFrame) {
IpcWriter::new(&mut stream)
.finish(raw_df)
.expect("IPC should not fail");
}
However, looking at a few Rust web framework (e.g. actix_web, rocket), there all seem to require creating a HTTP response via bytes in memory. Question: Is there any suggested way to be able to write to HTTP response body in Rust via Trait std::io::Write?
As otherwise, code so far requires 1-extra bytes vector allocation and copy, which I am trying to avoid.
use actix_web::{get, post, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use futures::{future::ok, stream::once};
#[get("/")]
async fn respond_df(data: web::Data<AppState>) -> HttpResponse {
let mut df = data.df.lock().unwrap();
let mut _mem_bytes: Vec<u8> = Vec::new();
IpcWriter::new(&mut _mem_bytes)
.finish(&mut df)
.expect("IPC should not fail");
let body = once(ok::<_, Error>(web::Bytes::from(_mem_bytes)));
HttpResponse::Ok()
.streaming(body)
}
Still new to the Rust eco-system, appreciate on feedback.
Web frameworks in general invert the program's control flow so that instead of your code writing bytes to some output, you prepare the data for the framework to read and it does all the remaining work for you.
So basically you have to convert IpcWriter (a synchronous writer) into some kind of asynchronous reader stream that Actix can use after calling your handler function. Note the two important conversions:
Writer to Reader
Synchronous to Asynchronous
What you want to do is possible, but given that all conversions involve some overhead (memory / CPU), you'll need to work out whether it's worth doing in your particular case.
One simple method would be to use a Tokio DuplexStream in respond_df. Something like:
let (async_writer, async_reader) = tokio::io::duplex(256 * 1024);
let stream_reader = tokio_util::io::ReaderStream::new(async_reader);
tokio::spawn(async move {
let handle = tokio::runtime::Handle::current();
let mut m: MyConverter = MyConverter::new(async_writer, handle);
IpcWriter::new(&mut m).finish(&mut df).expect("IPC should not fail");
});
HttpResponse::Ok().streaming(stream_reader)
Then define your converter:
struct MyConverter {
async_writer: tokio::io::DuplexStream,
handle: tokio::runtime::Handle,
}
impl MyConverter {
fn new(async_writer: tokio::io::DuplexStream,
handle: tokio::runtime::Handle) -> Self {
MyConverter {
async_writer: async_writer,
handle: handle,
}
}
}
// implementing this trait makes MyConverter compatible with IpcWriter
impl std::io::Write for MyConverter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let writer_handle = &mut self.async_writer;
let runtime_handle = &self.handle;
std::thread::scope(|s| {
// Spawning a thread per buffer write is very inefficient.
// Only doing this for illustrative purposes
s.spawn(move || {
// Using Handle::block_on to run async code in the new thread
runtime_handle.block_on(async {
let result = writer_handle.write(buf).await;
// error handling...
});
});
});
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}