Hi there,
Before getting into long explanation of the problem, I should state that I want to stream compressed messages over a bidirectional channel continously (without breaking the channel). If someone can give an example for that, I will be grateful.
I am developing a server-client application where the communication between these two components have very low latency, bidirectional channel, and efficient usage of the channel as requirements. Server and client communicates over the channel continously.
I chose the TcpStream as the communication channel. Serialization and deserialization of the messages are done by bincode. In this setup, with nodelay optimization applied to the Tcp, I can achieve a message passing system over network with low latency. However, the payload sent with messages can become huge, something like 300KB. Then I decided to apply some compression while streaming the messages over the channel. I choose the flate2 for this reason. Here is the general overview of the pipeline:
Client----------------------------------------------
Message -> bincode -> flate2 -> tcp
----------------------------------------------------
Server--------------------------------------------
tcp -> flate2 -> bincode -> Message
--------------------------------------------------
The problem is that the sent messages are not available on the other side immediately when flate2 is included in the pipeline compared to without flate2. When the flate2 is removed from the pipeline, any message sent is available immediately. Here is an example code without flate2 that can be run on your local machine.
/*
[dependencies]
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
flate2 = "1.0.25"
rand = "0.8.5"
serde = { version = "1.0.160", features = ["derive"] }
[profile.dev]
opt-level = "z"
debug = false
*/
use serde::{Deserialize, Serialize};
pub const CONFIG: bincode::config::Configuration = bincode::config::standard();
pub const COMPRESSION: flate2::Compression = flate2::Compression::new(6);
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Vec3 {
pub x: f32,
pub y: f32,
pub z: f32,
}
impl Default for Vec3 {
fn default() -> Self {
Vec3 {
x: rand::random(),
y: rand::random(),
z: rand::random(),
}
}
}
fn main() {
let is_server = std::env::args()
.into_iter()
.take(2)
.collect::<Vec<String>>()
.get(1)
.map(|s| s.as_str() == "server")
.unwrap_or(false);
let size = 1024 * 1024;
if is_server {
server(size);
} else {
client(size);
}
}
pub fn server(size: usize) {
let srv = std::net::TcpListener::bind("127.0.0.1:4002".parse::<std::net::SocketAddrV4>().unwrap()).unwrap();
let (mut stream, _) = srv.accept().unwrap();
for i in 0..10 {
let vec = receive(&mut stream);
println!("received {}", vec.len());
send(&mut stream, size);
println!("sent {i}");
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn client(size: usize) {
let mut stream = std::net::TcpStream::connect("127.0.0.1:4002").unwrap();
for i in 0..10 {
send(&mut stream, size);
println!("sent {i}");
let vec = receive(&mut stream);
println!("received {}", vec.len());
}
}
fn send(stream: &mut std::net::TcpStream, size: usize) {
bincode::serde::encode_into_std_write(
vec![Vec3::default(); size],
stream,
CONFIG,
)
.unwrap();
}
fn receive(stream: &mut std::net::TcpStream) -> Vec<Vec3> {
bincode::serde::decode_from_std_read(stream, CONFIG).unwrap()
}
When you run the server and client side by side with cargo run server
and cargo run
, you can see that messages are immediately deserialized when other side sends the message. The output is
Server Client
---------------- ----------------
received 1048576 sent 0
sent 0 received 1048576
received 1048576 sent 1
sent 1 received 1048576
received 1048576 sent 2
sent 2 received 1048576
received 1048576 sent 3
sent 3 received 1048576
received 1048576 sent 4
sent 4 received 1048576
received 1048576 sent 5
sent 5 received 1048576
received 1048576 sent 6
sent 6 received 1048576
received 1048576 sent 7
sent 7 received 1048576
received 1048576 sent 8
sent 8 received 1048576
received 1048576 sent 9
sent 9 received 1048576
Here is the example code with flate2 (only send and receive functions changed)
fn send(stream: &std::net::TcpStream, size: usize) {
let mut write = flate2::write::DeflateEncoder::new(stream, COMPRESSION);
bincode::serde::encode_into_std_write(
vec![Vec3::default(); size],
&mut write,
CONFIG,
)
.unwrap();
write.flush_finish().unwrap();
}
fn receive(stream: &std::net::TcpStream) -> Vec<Vec3> {
bincode::serde::decode_from_std_read(&mut flate2::read::DeflateDecoder::new(stream), CONFIG).unwrap()
}
This causes both client and server to stuck at receive part. The output is
Server Client
---------------- ----------------
sent 0
The interesting part is the size
value defined in the main
function. When the value of size
is small, e.g 32, first output, the desired one, is obtained. I started to believe that compressing data over continous stream confuses the decoder at the other end. So I tried to construct an encoder
and decoder
only once and use them for the lifetime of the channel. Here is the example I have tried (server, client, send and receive functions are changed)
pub fn server(size: usize) {
let srv = std::net::TcpListener::bind("0.0.0.0:4002".parse::<std::net::SocketAddrV4>().unwrap()).unwrap();
let (stream, _) = srv.accept().unwrap();
let mut write = flate2::write::DeflateEncoder::new(&stream, COMPRESSION);
let mut read = flate2::read::DeflateDecoder::new(&stream);
for i in 0..10 {
let vec = receive(&mut read);
println!("received {}", vec.len());
send(&mut write, size);
println!("sent {i}");
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn client(size: usize) {
let stream = std::net::TcpStream::connect("127.0.0.1:4002").unwrap();
let mut write = flate2::write::DeflateEncoder::new(&stream, COMPRESSION);
let mut read = flate2::read::DeflateDecoder::new(&stream);
for i in 0..10 {
send(&mut write, size);
println!("sent {i}");
let vec = receive(&mut read);
println!("received {}", vec.len());
}
}
fn send(stream: &mut flate2::write::DeflateEncoder<&std::net::TcpStream>, size: usize) {
use std::io::Write;
bincode::serde::encode_into_std_write(
vec![Vec3::default(); size],
stream,
CONFIG,
)
.unwrap();
stream.flush().unwrap();
}
fn receive(stream: &mut flate2::read::DeflateDecoder<&std::net::TcpStream>) -> Vec<Vec3> {
bincode::serde::decode_from_std_read(stream, CONFIG).unwrap()
}
The output is
Server Client
---------------- ----------------
received 1048576 sent 0
sent 0 received 1048576
sent 1
If someone can give an example for streaming compressed messages over a channel, I will be grateful.
Thanks