Question regarding to TcpStream, Flate2 and Bincode

Hi there,

Before getting into long explanation of the problem, I should state that I want to stream compressed messages over a bidirectional channel continously (without breaking the channel). If someone can give an example for that, I will be grateful.

I am developing a server-client application where the communication between these two components have very low latency, bidirectional channel, and efficient usage of the channel as requirements. Server and client communicates over the channel continously.

I chose the TcpStream as the communication channel. Serialization and deserialization of the messages are done by bincode. In this setup, with nodelay optimization applied to the Tcp, I can achieve a message passing system over network with low latency. However, the payload sent with messages can become huge, something like 300KB. Then I decided to apply some compression while streaming the messages over the channel. I choose the flate2 for this reason. Here is the general overview of the pipeline:


Client----------------------------------------------
Message -> bincode -> flate2 -> tcp
----------------------------------------------------

Server--------------------------------------------
tcp -> flate2 -> bincode -> Message
--------------------------------------------------

The problem is that the sent messages are not available on the other side immediately when flate2 is included in the pipeline compared to without flate2. When the flate2 is removed from the pipeline, any message sent is available immediately. Here is an example code without flate2 that can be run on your local machine.

/*
[dependencies]
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
flate2 = "1.0.25"
rand = "0.8.5"
serde = { version = "1.0.160", features = ["derive"] }

[profile.dev]
opt-level = "z"
debug = false
*/

use serde::{Deserialize, Serialize};

pub const CONFIG: bincode::config::Configuration = bincode::config::standard();
pub const COMPRESSION: flate2::Compression = flate2::Compression::new(6);

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Vec3 {
    pub x: f32,
    pub y: f32,
    pub z: f32,
}

impl Default for Vec3 {
    fn default() -> Self {
        Vec3 {
            x: rand::random(),
            y: rand::random(),
            z: rand::random(),
        }
    }
}

fn main() {
    let is_server = std::env::args()
        .into_iter()
        .take(2)
        .collect::<Vec<String>>()
        .get(1)
        .map(|s| s.as_str() == "server")
        .unwrap_or(false);

    let size = 1024 * 1024;

    if is_server {
        server(size);
    } else {
        client(size);
    }
}

pub fn server(size: usize) {
    let srv = std::net::TcpListener::bind("127.0.0.1:4002".parse::<std::net::SocketAddrV4>().unwrap()).unwrap();

    let (mut stream, _) = srv.accept().unwrap();

    for i in 0..10 {
        let vec = receive(&mut stream);
        println!("received {}", vec.len());

        send(&mut stream, size);
        println!("sent {i}");

        std::thread::sleep(std::time::Duration::from_secs(1));
    }
}

pub fn client(size: usize) {
    let mut stream = std::net::TcpStream::connect("127.0.0.1:4002").unwrap();

    for i in 0..10 {
        send(&mut stream, size);
        println!("sent {i}");

        let vec = receive(&mut stream);
        println!("received {}", vec.len());
    }
}

fn send(stream: &mut std::net::TcpStream, size: usize) {
    bincode::serde::encode_into_std_write(
        vec![Vec3::default(); size],
        stream,
        CONFIG,
    )
    .unwrap();
}

fn receive(stream: &mut std::net::TcpStream) -> Vec<Vec3> {
    bincode::serde::decode_from_std_read(stream, CONFIG).unwrap()
}

When you run the server and client side by side with cargo run server and cargo run, you can see that messages are immediately deserialized when other side sends the message. The output is

Server              Client
----------------    ----------------
received 1048576    sent 0
sent 0              received 1048576
received 1048576    sent 1
sent 1              received 1048576
received 1048576    sent 2
sent 2              received 1048576
received 1048576    sent 3
sent 3              received 1048576
received 1048576    sent 4
sent 4              received 1048576
received 1048576    sent 5
sent 5              received 1048576
received 1048576    sent 6
sent 6              received 1048576
received 1048576    sent 7
sent 7              received 1048576
received 1048576    sent 8
sent 8              received 1048576
received 1048576    sent 9
sent 9              received 1048576

Here is the example code with flate2 (only send and receive functions changed)

fn send(stream: &std::net::TcpStream, size: usize) {
    let mut write = flate2::write::DeflateEncoder::new(stream, COMPRESSION);

    bincode::serde::encode_into_std_write(
        vec![Vec3::default(); size],
        &mut write,
        CONFIG,
    )
    .unwrap();

    write.flush_finish().unwrap();
}

fn receive(stream: &std::net::TcpStream) -> Vec<Vec3> {
    bincode::serde::decode_from_std_read(&mut flate2::read::DeflateDecoder::new(stream), CONFIG).unwrap()
}

This causes both client and server to stuck at receive part. The output is

Server              Client
----------------    ----------------
                    sent 0

The interesting part is the size value defined in the main function. When the value of size is small, e.g 32, first output, the desired one, is obtained. I started to believe that compressing data over continous stream confuses the decoder at the other end. So I tried to construct an encoder and decoder only once and use them for the lifetime of the channel. Here is the example I have tried (server, client, send and receive functions are changed)

pub fn server(size: usize) {
    let srv = std::net::TcpListener::bind("0.0.0.0:4002".parse::<std::net::SocketAddrV4>().unwrap()).unwrap();

    let (stream, _) = srv.accept().unwrap();
    let mut write = flate2::write::DeflateEncoder::new(&stream, COMPRESSION);
    let mut read = flate2::read::DeflateDecoder::new(&stream);

    for i in 0..10 {
        let vec = receive(&mut read);
        println!("received {}", vec.len());

        send(&mut write, size);
        println!("sent {i}");

        std::thread::sleep(std::time::Duration::from_secs(1));
    }
}

pub fn client(size: usize) {
    let stream = std::net::TcpStream::connect("127.0.0.1:4002").unwrap();
    let mut write = flate2::write::DeflateEncoder::new(&stream, COMPRESSION);
    let mut read = flate2::read::DeflateDecoder::new(&stream);

    for i in 0..10 {
        send(&mut write, size);
        println!("sent {i}");

        let vec = receive(&mut read);
        println!("received {}", vec.len());
    }
}

fn send(stream: &mut flate2::write::DeflateEncoder<&std::net::TcpStream>, size: usize) {
    use std::io::Write;

    bincode::serde::encode_into_std_write(
        vec![Vec3::default(); size],
        stream,
        CONFIG,
    )
    .unwrap();

    stream.flush().unwrap();
}

fn receive(stream: &mut flate2::read::DeflateDecoder<&std::net::TcpStream>) -> Vec<Vec3> {
    bincode::serde::decode_from_std_read(stream, CONFIG).unwrap()
}

The output is

Server              Client
----------------    ----------------
received 1048576    sent 0
sent 0              received 1048576
                    sent 1

If someone can give an example for streaming compressed messages over a channel, I will be grateful.

Thanks

I suspect the flate2 decompressor may be trying to see what's the next byte after whatever block it has already decompressed (perhaps it fills an internal buffer for efficiency or just isn't careful about not peeking more than it needs), and will end up waiting for the next message to arrive or connection to close.

You could try decoding with a lower-level zlib API that isn't a stream, and instead explicitly takes byte buffers. This will let you read only what is available on the socket without blocking.

Alternatively, wrap your compressed messages in <length><bytes> protocol, so that you will explicitly know how much to read from the stream. Rust's io::Read supports .by_ref().take(bytes) to give you a temporary stream that will signal EOF after a number of bytes.

Thanks for the suggestions @kornel . I was also planning to use lower level APIs from the flate2 crate unless an easy way exists.

In my opinion, the alternative solution you have proposed requires first storing the serialized and compressed data into a buffer before starting the send operation. Allocating a buffer is what I was trying to avoid. With unknown payload sizes, this solution may not be efficient compared to writing compressed data to the stream as it gets compressed.

Yes, extra protocol adds buffering and data overhead. You could tune it though, e.g. instead of sending whole message as one chunk, split the message into many smaller <length><bytes> chunks, with some bit indicating which chunk is the last one. You can also get creative with variable-length encodings of the length and state, e.g. have a 1-byte special header for a common case of a standard buffer size you use.

BTW, if you choose to deploy a streaming protocol secured over TLS, it will have a problem with TLS adding its own chunking, typically in blocks of 4KB to 16KB.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.