Web server performance issue

Hi :wave:

I'm new to Rust and have a software that i want to port to Rust for many reasons.

But, I have a performance problem with the way I serve large files using Hyper and tokio.
The current software is twice as fast as my Rust code.

I serve each file as Stream, as it may be to big to fit in a single HTTP response, using Body::wrap_stream. So I implement a Stream object like that :

struct TokioStream {
    file: tokio::fs::File,
    buf: Vec<u8>,
}
impl Stream for TokioStream {
    type Item = Vec<u8>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.file.poll_read(&mut self.buf) {
            Ok(Async::Ready(0)) => {
                Ok(Async::Ready(None))
            },
            Ok(Async::Ready(_size)) => {
                Ok(Async::Ready(Some(Vec::from(&self.buf[..]))))
            },
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(_) => Err(Error::new(ErrorKind::Other, "😭")),
        }
    }
}

I'm afraid of reallocating a new vector each time poll is called. How could I use the same vector for every chunk ?

Another point who can explain the performance problem is how I use Tokio and Hyper. But nothing fancy here IMHO :

fn main() {
    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn_ok( |_| {
            let task = tokio::fs::File::open("big")
                .and_then( |file|  {
                    Ok(TokioStream{ file: file,
                        buf: vec!(0; BUF_SIZE)
                    })
                });
            match task.wait() {
                Err(_) => Response::new(Body::from("Please (╥_╥)")),
                Ok(s) => Response::new(Body::wrap_stream(s)),
            }
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

The last thing that I think may be the source of performance problem is the size of the buffer. But after some tests, it seems that 1<<22 is a good size on my machine. I don't know which one is use one the other solution.

const BUF_SIZE : usize = 1<<22;

How can I improve that code ? I'm pretty sure, I can do better with Rust.

To benchmark the solution, I simply download a file to /dev/null with wget "http://localhost:1337" -0 /dev/null. The file is about 7Gio big.

$ wget "http://localhost:8080/big" -O /dev/null # The old one
2019-04-20 08:30:12 (420 MB/s) — « /dev/null » sauvegardé [7038594877/7038594877]
$ wget "http://localhost:1337/big" -O /dev/null # The Rust code
2019-04-20 09:10:45 (230 MB/s) — « /dev/null » sauvegardé [7038594877/7038594877]

Any help will be appreciated. :pray:
Have a nice day.

Hello ache, welcome to the rust community.

Performance debugging is not as easy as it looks usually.
Your stream implementation is inefficient, I guess your program bottleneck are following codes :

I recommend you that use a buffer slice instead of a vector : example

It's better use combinators instead of wait(), however in this case probably you don't need use an additional combinator.

            let task = tokio::fs::File::open("big")
                .and_then( |file|  {
                    Ok(TokioStream{ file: file,
                        buf: vec!(0; BUF_SIZE)
                    })
                });
            match task.wait() {
                Err(_) => Response::new(Body::from("Please (╥_╥)")),
                Ok(s) => Response::new(Body::wrap_stream(s)),
            }

Can be something like the following code :

           let task = tokio::fs::File::open("big")
               .and_then( |file|  {
                  Response::new(Body::wrap_stream(TokioStream{ file: file,
                       buf: vec!(0; BUF_SIZE)
                   }))
               })
1 Like

Thank you ! :flushed:

That's what I thought. But I can't optimize it even a little ! I tried to use a slice and failed to do it correctly :confounded: :

const BUF_SIZE : usize = 1<<21;

struct TokioStream {
    file: tokio::fs::File,
    buf: Box<[u8]>,
}
impl Stream for TokioStream {
    type Item = Vec<u8>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
            match self.file.poll_read(&mut self.buf) {
                Ok(Async::Ready(0)) => {
                    Ok(Async::Ready(None))
                },
                Ok(Async::Ready(size)) => {
                    Ok(Async::Ready(Some(self.buf[..size].to_vec())))
                },
                Ok(Async::NotReady) => {
                    Ok(Async::NotReady)
                },
                Err(_) => {
                    Err(Error::new(ErrorKind::Other, "😭"))
                },
            }
    }
}
fn main() {
    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn( |_| {
            tokio::fs::File::open("big")
                .and_then( |file|  {
                    let fs = TokioStream{ file: file, buf: Box::new([0; BUF_SIZE])};
                    Ok(Response::new(Body::wrap_stream(fs)))
            })
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

But it doesn't change anything. Finally, I tried that ugly piece of code that convinced me that my implementation still inefficient :

const BUF_SIZE : usize = 1<<22;
static mut BUF : [u8; BUF_SIZE] = [0; BUF_SIZE];

struct TokioStream {
    file: tokio::fs::File,
}
impl Stream for TokioStream {
    type Item = Chunk;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        unsafe {
            match self.file.poll_read(&mut BUF[..]) {
                Ok(Async::Ready(0)) => {
                    Ok(Async::Ready(None))
                },
                Ok(Async::Ready(size)) => {
                    Ok(Async::Ready(Some(Chunk::from(&BUF[..size]))))
                },
                Ok(Async::NotReady) => {
                    Ok(Async::NotReady)
                },
                Err(_) => {
                    Err(Error::new(ErrorKind::Other, "😭"))
                },
            }
        }
    }
}
fn main() {
    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn( |_| {
            tokio::fs::File::open("big")
                .and_then( |file|  {
                    let fs = TokioStream{ file: file };
                    Ok(Response::new(Body::wrap_stream(fs)))
            })
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

I'm not using Rust to make ugly unsafe code like that but, it goes up to 290Mb/s ! It's under the code of the previous software but it's way butter than my previous implementation of TokioStream.

If I can get to improve the code quality of the previous code, it would be awesome. But definitively, there is something else that slow down the code.

Thanks for your post !

I'm now pretty sure that the problem is how I read the file. I would like to confirm or deny it using a profiler but I'm new to Rust so it seems a bit complicated. Like every little thing actually. ¯\_(ツ)_/¯

Each time the Stream need a chunk, I read it. But, I shouldn't wait to need a chunk to start reading the next one.

From my test can read a file @425MB/s and the speed of the network is virtually @600MB/s (on the loopback interface I mean). I should be able to send a file at about 400MB/s.

Here are my tests :

extern crate tokio;
extern crate futures;
extern crate hyper;
extern crate time;

use futures::{Poll, Async, Stream, Future};
use std::io::Error;
use hyper::{Body, Response, Server, service::service_fn};
use time::PreciseTime;
use tokio::io::AsyncRead;


const BUF_SIZE : usize = 1<<19;
static mut BUF : [u8; BUF_SIZE] = [0; BUF_SIZE];

struct TokioStream {
}
impl Stream for TokioStream {
    type Item = &'static[u8];
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        unsafe {
            /*
            match self.file.poll_read(&mut BUF) {
                Ok(Async::Ready(0)) => {
                    Ok(Async::Ready(None))
                },
                Ok(Async::Ready(size)) => {
                    Ok(Async::Ready(Some(&BUF[..])))
                },
                Ok(Async::NotReady) => {
                    Ok(Async::NotReady)
                },
                Err(_) => {
                    Err(Error::new(ErrorKind::Other, "😭"))
                },
            }
            */
            Ok(Async::Ready(Some(&BUF[..])))
        }
    }
}
static mut a :i64 = 0;
fn main() {
    let start = PreciseTime::now();
    let task = tokio::fs::File::open("big")
        .and_then(|mut file| unsafe {
            loop {
                let r = file.poll_read(&mut BUF);
                match r {
                    Ok(Async::Ready(0)) => {
                        break r
                    },
                    Ok(Async::Ready(size)) => {
                        a += size as i64;
                        //Ok(Async::Ready(Some(&BUF[..])));
                        continue
                    },
                    Ok(Async::NotReady) => {
                        break r
                    },
                    Err(_) => {
                        break r
                    },
                }
            }
        }
        ).map(|_| {
            ;
        }).map_err(|err| eprintln!("IO error: {:?}", err));

    tokio::run(task);
    let end = PreciseTime::now();
    unsafe {
        let dur : f64 = start.to(end).num_milliseconds() as f64 / 1000.;
        let size : f64 = a as f64;
        println!("{:.3}MB/s", (size / dur)/(1024.*1024.));
    }

    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn( |_| {
            tokio::fs::File::open("big")
                .and_then( |_|  {
                    let fs = TokioStream{};
                    Ok(Response::new(Body::wrap_stream(fs)))
                })
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

I will keep you informed if I make any progress. :wave:

PS: Sorry about my english, my english level is ridiculous "it's butter" :rofl:
I'm pretty sure i made a lot more.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.