I have a library that I am working on for a while called OxideDB. It was my first attempt at a large Rust project so the code is a little rough, but it works. The idea is to act as a translation layer between a MongoDB server and a PostgreSQL backend so in the end you make PostgreSQL emulate a MongoDB server.
I am now working on rewriting it based on my experience for another 6-9mo working with Rust, and the first thing I wanted to implement was a way to receive requests from MongoDB on a port I am listening and bifurcate the request into MongoDB directly and also to my translation layer and finally to PostgreSQL.
I wanted to capture the incoming message on a binary format so I can use it as a fixture for my tests, but also compare the responses I get from both servers and investigate the differences.
In order to implement that, I tried to bend and change the Proxy example that Tokio provides but I haven't been able to. Again, due to my utter lack of experience in this realm.
I was wondering if someone could point me in the right direction, with at least the mental model you'd follow to make this code behave by 1) adding an option to log the raw request to a file; 2) bifurcate the incoming tcp traffic to two servers; 3) log both responses and finally; 4) elect one or the other downstream responses from one of the servers to return to the client.
Below is the code I wanted to modify, but couldn't. Be welcome to suggest changes to this code or provide a new path altogether.
https://github.com/tokio-rs/tokio/blob/master/examples/proxy.rs
//! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
//! Because the Tokio runtime uses a thread pool, each TCP connection is
//! processed concurrently with all other TCP connections across multiple
//! threads.
//!
//! You can showcase this by running this in one terminal:
//!
//! cargo run --example proxy
//!
//! This in another terminal
//!
//! cargo run --example echo
//!
//! And finally this in another terminal
//!
//! cargo run --example connect 127.0.0.1:8081
//!
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.
#![warn(rust_2018_idioms)]
use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use futures::FutureExt;
use std::env;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listen_addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8081".to_string());
let server_addr = env::args()
.nth(2)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let listener = TcpListener::bind(listen_addr).await?;
while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);
}
});
tokio::spawn(transfer);
}
Ok(())
}
async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
let mut outbound = TcpStream::connect(proxy_addr).await?;
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
let client_to_server = async {
io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await
};
let server_to_client = async {
io::copy(&mut ro, &mut wi).await?;
wi.shutdown().await
};
tokio::try_join!(client_to_server, server_to_client)?;
Ok(())
}
Thank you very much!