Tcp Proxy for multiple backends using Tokio

I have a library that I am working on for a while called OxideDB. It was my first attempt at a large Rust project so the code is a little rough, but it works. The idea is to act as a translation layer between a MongoDB server and a PostgreSQL backend so in the end you make PostgreSQL emulate a MongoDB server.

I am now working on rewriting it based on my experience for another 6-9mo working with Rust, and the first thing I wanted to implement was a way to receive requests from MongoDB on a port I am listening and bifurcate the request into MongoDB directly and also to my translation layer and finally to PostgreSQL.

I wanted to capture the incoming message on a binary format so I can use it as a fixture for my tests, but also compare the responses I get from both servers and investigate the differences.

In order to implement that, I tried to bend and change the Proxy example that Tokio provides but I haven't been able to. Again, due to my utter lack of experience in this realm.

I was wondering if someone could point me in the right direction, with at least the mental model you'd follow to make this code behave by 1) adding an option to log the raw request to a file; 2) bifurcate the incoming tcp traffic to two servers; 3) log both responses and finally; 4) elect one or the other downstream responses from one of the servers to return to the client.

Below is the code I wanted to modify, but couldn't. Be welcome to suggest changes to this code or provide a new path altogether.

https://github.com/tokio-rs/tokio/blob/master/examples/proxy.rs

//! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
//! Because the Tokio runtime uses a thread pool, each TCP connection is
//! processed concurrently with all other TCP connections across multiple
//! threads.
//!
//! You can showcase this by running this in one terminal:
//!
//!     cargo run --example proxy
//!
//! This in another terminal
//!
//!     cargo run --example echo
//!
//! And finally this in another terminal
//!
//!     cargo run --example connect 127.0.0.1:8081
//!
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.

#![warn(rust_2018_idioms)]

use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};

use futures::FutureExt;
use std::env;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let listen_addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8081".to_string());
    let server_addr = env::args()
        .nth(2)
        .unwrap_or_else(|| "127.0.0.1:8080".to_string());

    println!("Listening on: {}", listen_addr);
    println!("Proxying to: {}", server_addr);

    let listener = TcpListener::bind(listen_addr).await?;

    while let Ok((inbound, _)) = listener.accept().await {
        let transfer = transfer(inbound, server_addr.clone()).map(|r| {
            if let Err(e) = r {
                println!("Failed to transfer; error={}", e);
            }
        });

        tokio::spawn(transfer);
    }

    Ok(())
}

async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
    let mut outbound = TcpStream::connect(proxy_addr).await?;

    let (mut ri, mut wi) = inbound.split();
    let (mut ro, mut wo) = outbound.split();

    let client_to_server = async {
        io::copy(&mut ri, &mut wo).await?;
        wo.shutdown().await
    };

    let server_to_client = async {
        io::copy(&mut ro, &mut wi).await?;
        wi.shutdown().await
    };

    tokio::try_join!(client_to_server, server_to_client)?;

    Ok(())
}

Thank you very much!

An extremely simplified way of doing something like this, which assumes there's only a single outgoing message and a single response from each backend might looks something like this

async fn transfer(
    mut inbound: TcpStream,
    backend_one: String,
    backend_two: String,
) -> Result<(), Box<dyn Error>> {
    let mut backend_one = TcpStream::connect(backend_one).await?;
    let mut backend_two = TcpStream::connect(backend_two).await?;

    let mut buf = Vec::new();

    // Read the whole incoming message into memory for simplicity
    inbound.read_to_end(&mut buf).await?;

    let backend_one_fut = async {
        // Create a slice so we don't need to capture an `&mut` to `buf`
        let mut buf = &buf[..];
        // Copy the whole incoming message to the first backend
        io::copy(&mut buf, &mut backend_one).await?;

        let mut output = Vec::new();
        // Collect the whole response from the first backend
        io::copy(&mut backend_one, &mut output).await?;

        Ok::<_, io::Error>(output)
    };

    let backend_two_fut = async {
        let mut buf = &buf[..];
        io::copy(&mut buf, &mut backend_two).await?;

        let mut output = Vec::new();
        io::copy(&mut backend_two, &mut output).await?;

        Ok::<_, io::Error>(output)
    };

    let (backend_one_response, backend_two_response) =
        tokio::try_join!(backend_one_fut, backend_two_fut)?;

    // Do something with the responses

    Ok(())
}

You should probably send the incoming message to each backend incrementally instead of collecting the whole thing, but it should be pretty easy to adjust the code to do that.

1 Like

Thank you very much @semicoleon, as always a lot of good insights on your responses.

I have now played with your code for a couple of hours, trying to make it work for this case in particular, but I am stuck again and any outside insight would be extremely helpful.

I am sure I am messing things up in more than one place.

For reference below is the current state of the code as it stands. I am only running it with one backend using cargo run 127.0.0.1:27018 127.0.0.1:27017. My strategy is to make it work with one backend first and then take it from there.

use tokio::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

use futures::FutureExt;
use std::env;
use std::error::Error;

#[tokio::main()]
async fn main() -> Result<(), Box<dyn Error>> {
    let listen_addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:27018".to_string());

    let targets = env::args().skip(2).collect::<Vec<String>>();

    let server_addrs = targets
        .iter()
        .map(|target| target.to_string())
        .collect::<Vec<String>>();

    println!("Listening on: {}", listen_addr);
    println!("Proxying to: {:?}", server_addrs);

    let listener = TcpListener::bind(listen_addr).await?;

    while let Ok((inbound, _)) = listener.accept().await {
        println!("Accepted connection from: {}", inbound.peer_addr()?);
        let transfer = transfer(inbound, server_addrs.clone()).map(|r| {
            if let Err(e) = r {
                println!("Failed to transfer; error={}", e);
            }
        });

        tokio::spawn(transfer);
    }

    Ok(())
}

async fn transfer(
    mut inbound: TcpStream,
    backend_addresses: Vec<String>,
) -> Result<(), Box<dyn Error>> {
    let client_addr = inbound.peer_addr()?;

    let mut backends = Vec::new();
    for address in backend_addresses {
        let backend = TcpStream::connect(address).await?;
        backends.push(backend);
    }

    let server_addr = backends[0].peer_addr()?;

    let mut buf = Vec::new();

    println!("\n[{}] -------", client_addr);
    println!("[{}] Starting the read...", client_addr);
    let n = inbound.read_to_end(&mut buf).await?;
    println!("[{}] Read {} bytes from client", client_addr, n);

    let mut outputs = Vec::new();
    for backend in backends.iter_mut() {
        let addr = backend.peer_addr()?;
        println!("\n[{}=>{}] -------", client_addr, addr);
        let mut buf = &buf[..];
        println!(
            "[{}=>{}] Sending client request to backend...",
            client_addr, addr
        );
        let (mut ri, mut wi) = backend.split();
        match io::copy(&mut buf, &mut wi).await {
            Ok(n) => println!("[{}=>{}] Wrote {} bytes to backend", client_addr, addr, n),
            Err(e) => println!(
                "[{}=>{}] Error writing to backend: {}",
                client_addr, addr, e
            ),
        }
        wi.shutdown().await?;

        let mut output = Vec::new();
        println!("\n[{}] -------", addr);
        println!("[{}] Reading backend response...", addr);
        match ri.read_to_end(&mut output).await {
            Ok(n) => println!("[{}] Read {} bytes from backend", addr, n),
            Err(e) => println!("[{}] Error reading from backend: {}", addr, e),
        }
        // match io::copy(&mut ri, &mut output).await {
        //     Ok(n) => println!("[{}] Read {} bytes from backend", addr, n),
        //     Err(e) => println!("[{}] Error reading from backend: {}", addr, e),
        // }
        outputs.push(output);
    }

    println!("\n[{}=>{}] -------", server_addr, client_addr);
    println!(
        "[{}=>{}] Sending backend response to client...",
        server_addr, client_addr
    );
    let mut buf = &outputs[0][..];
    match io::copy(&mut buf, &mut inbound).await {
        Ok(n) => println!(
            "[{}=>{}] Wrote {} bytes back to client",
            server_addr, client_addr, n
        ),
        Err(e) => println!(
            "[{}=>{}] Error writing to client: {}",
            server_addr, client_addr, e
        ),
    }

    Ok(())
}

As you can see there's a lot of instrumentation (fancy word for println debugging :slight_smile: ) and I think MongoDB does a lot of parallel communication when it first attempts to connect.

I tried both the approach you suggested but also reading data to a buffer of size 1024 and none of them worked.

Some hypothesis I considered were: 1) I'm only being able to transfer the first message that I receive, or; 2) the client is somehow not waiting until it gets a response to feed us new messages; or 3) my response to the client is never really getting to it.

Here's a trace of what's happening when I try to connect (I am running with only one backend and that's the "real" MongoDB backend:

Listening on: 127.0.0.1:27018
Proxying to: ["127.0.0.1:27017"]
Accepted connection from: 127.0.0.1:62760

[127.0.0.1:62760] -------
[127.0.0.1:62760] Starting the read...
[127.0.0.1:62760] Read 367 bytes from client

[127.0.0.1:62760=>127.0.0.1:27017] -------
[127.0.0.1:62760=>127.0.0.1:27017] Sending client request to backend...
[127.0.0.1:62760=>127.0.0.1:27017] Wrote 367 bytes to backend

[127.0.0.1:27017] -------
[127.0.0.1:27017] Reading backend response...
[127.0.0.1:27017] Read 329 bytes from backend

[127.0.0.1:27017=>127.0.0.1:62760] -------
[127.0.0.1:27017=>127.0.0.1:62760] Sending backend response to client...
[127.0.0.1:27017=>127.0.0.1:62760] Wrote 329 bytes back to client

And here's what mongosh or MongoDB CLI reports when I connect to our proxy:

> $ mongosh 127.0.0.1:27018
Current Mongosh Log ID: 63ffc33a99c6750faaccaf3c
Connecting to:          mongodb://127.0.0.1:27018/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0
MongoServerSelectionError: Server selection timed out after 2000 ms

Versus what it does when I connect to the real server listening to port 27017:

> $ mongosh 127.0.0.1:27017
Current Mongosh Log ID: 63ffc42bc952da79b87fe149
Connecting to:          mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0
Using MongoDB:          4.4.18
Using Mongosh:          1.8.0

For mongosh info see: https://docs.mongodb.com/mongodb-shell/

------
   The server generated these startup warnings when booting
   2023-03-01T17:49:43.136+00:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem
   2023-03-01T17:49:43.822+00:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted
------

------
   Enable MongoDB's free cloud-based monitoring service, which will then receive and display
   metrics about your deployment (disk utilization, CPU, operation statistics, etc).
   
   The monitoring data will be available on a MongoDB website with a unique URL accessible to you
   and anyone you share the URL with. MongoDB may use this information to make product
   improvements and to suggest MongoDB products and deployment options to you.
   
   To enable free monitoring, run the following command: db.enableFreeMonitoring()
   To permanently disable this reminder, run the following command: db.disableFreeMonitoring()
------

test> 

Thank you!

I imagine the mongodb client is expecting to make multiple requests on the same connection. The original example code is oriented towards an HTTP like protocol that only has a single request and response per connection.

1 Like

That makes sense, I'll try to make some changes with that in mind!

Oh I should also note, your code as written won't be sending the requests to each backend in parallel. You're calling await inside the loop on each of the operations. You'll need something like FuturesUnordered to run all of the requests in parallel.

1 Like

Great point, thank you! I will keep that in mind and start investigating that API when I get to a point where at least one backend is working correctly.

Thank you so much, yet again!

OK, managed to get a working version going. Not sure if the best incarnation of what I wanted but it's working beautifully for my use case.

Here's the full code for future reference:

use std::env;
use std::error::Error;

use futures::{
    stream::{self, StreamExt},
    FutureExt,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let listen_addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:27018".to_string());
    let server_addrs = env::args().skip(2).collect::<Vec<_>>();

    println!("Listening on: {}", listen_addr);
    println!("Proxying to: {:?}", server_addrs);

    let listener = TcpListener::bind(listen_addr).await?;

    while let Ok((inbound, _)) = listener.accept().await {
        let transfer = transfer(inbound, server_addrs.clone()).map(|r| {
            if let Err(e) = r {
                println!("Failed to transfer; error={}", e);
            }
        });

        tokio::spawn(transfer);
    }

    Ok(())
}

async fn transfer(mut inbound: TcpStream, proxy_addrs: Vec<String>) -> Result<(), Box<dyn Error>> {
    let tcp_futures = proxy_addrs
        .iter()
        .map(|addr| TcpStream::connect(addr))
        .collect::<Vec<_>>();
    let mut outbounds_stream = stream::iter(tcp_futures).buffer_unordered(10);

    let mut outbounds = Vec::new();
    while let Some(outbound) = outbounds_stream.next().await {
        let outbound = outbound?;
        outbounds.push(outbound);
    }

    let mut outbound_halves = outbounds
        .into_iter()
        .map(|outbound| outbound.into_split())
        .collect::<Vec<_>>();

    let (mut ir, mut iw) = inbound.split();

    loop {
        // read from inbound
        let mut buf = [0u8; 4096];
        let n = ir.read(&mut buf).await?;
        if n == 0 {
            break;
        }

        let mut responses = Vec::new();
        for (or, ow) in &mut outbound_halves {
            // write to outbound
            ow.write_all(&buf[..n]).await?;
            ow.flush().await?;

            // read from outbound
            let mut buf = [0u8; 4096];
            let n = or.read(&mut buf).await?;
            if n == 0 {
                continue;
            }

            responses.push(buf[..n].to_vec());
        }

        println!(
            "{} outgoing {:?} responses",
            responses.len(),
            responses.iter().map(|r| r.len()).collect::<Vec<_>>()
        );

        // write to inbound
        let Some(buf) = responses.get(0) else {
            break;
        };
        let buf = buf.as_slice();
        iw.write_all(&buf).await?;
        iw.flush().await?;
    }

    Ok(())
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.