Custom Tor (Arti) transport and sync bridge for the ureq library

To implement Tor in ureq through Arti I attempted to create a custom Transport for ureq. The main difficulty was implementing the Transport trait and bridging with the sync IO from ureq from Arti's async I/O.

My first step was inspecting the lib.rs from arti-hyper trying to copy it. But the code was a bit overwhelming and I didn't fully understand it. So I took a step back and tried to implement it in my own way with the knowledge I have about TCP/Http. Step-by-step this resulted in a working implementation, and with a bit of guidance and support from this community I was able to successfully implement it.

I made a fully working (very basic) implementation of what I was trying to achieve. It is not complete yet, I still have to implement the TLS layer and timeouts, but the base of the implementation is already functional. I can send a non-tls HTTP request over Tor using the standard ureq library.

I'd like some feedback on my current progress. Anyway, this was extremely fun to do and really learnful. By implementing it step-by-step with my own knowledge I in the end automatically used the same principles as in arti-hyper's lib.rs. And due to this experiment I now fully understand the arti-hyper implementation while in the start it felt overwhelming.

What do you think of this code?

use tor_proto::stream::{DataReader, DataWriter};
use tor_rtcompat::{PreferredRuntime, Runtime};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::{Arc, Mutex};
use ureq::unversioned::{
    transport::{
        ConnectionDetails,
        Transport,
        NextTimeout,
        Connector,
        Buffers,
        LazyBuffers
    },
    resolver::DefaultResolver,
};
use arti_client::{
    TorClient, 
    TorClientConfig, 
    config::Reconfigure
};
use educe::Educe;

#[derive(Educe)]
#[educe(Debug)]
pub struct ArtiHttpConnection<R: Runtime> {
    r: Arc<Mutex<DataReader>>,
    w: Arc<Mutex<DataWriter>>,
    #[educe(Debug(ignore))]
    buffer: LazyBuffers,
    rt: R
}

impl<R: Runtime> Transport for ArtiHttpConnection<R> {
    fn buffers(&mut self) -> &mut dyn Buffers {
        &mut self.buffer
    }

    fn transmit_output(
        &mut self,
        amount: usize,
        _timeout: NextTimeout,
    ) -> Result<(), ureq::Error> {
        let writer = Arc::clone(&self.w);
        let buffer = self.buffer.output(); 
        let data_to_write = &buffer[..amount];
        println!("Writing: {:?}", String::from_utf8_lossy(data_to_write));
        self.rt.block_on(async {
            let mut writer = match writer.lock() {
                Ok(w) => w,
                Err(_) => return Err(ureq::Error::Io(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "Mutex lock failed",
                ))),
            };

            writer.write_all(data_to_write).await.map_err(|e| ureq::Error::Io(e))?;
            writer.flush().await?;
            Ok(())
        })
    }

    fn await_input(&mut self, _timeout: NextTimeout) -> Result<bool, ureq::Error> {
        let reader = Arc::clone(&self.r);
        self.rt.block_on(async {
            let mut reader = match reader.lock() {
                Ok(r) => r,
                Err(_) => return Err(ureq::Error::Io(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "Mutex lock failed",
                ))),
            };

            let buf = self.buffer.input_append_buf();
            let mut temp_buf = vec![0; buf.len()];
            let read_result = reader.read(&mut temp_buf).await;

            match read_result {
                Ok(size) if size > 0 => {
                    buf[..size].copy_from_slice(&temp_buf[..size]);
                    self.buffer.input_appended(size);
                    Ok(true)
                }
                Ok(_) => Ok(false), 
                Err(e) => Err(ureq::Error::Io(e)),
            }
        })
    }

    fn is_open(&mut self) -> bool {
        let _r = match self.r.lock() {
            Ok(r) => r,
            Err(_) => { return false; },
        };
        (*_r).ctrl().is_open()
    }
}

#[derive(Educe)]
#[educe(Debug)]
pub struct ArtiHttpConnector<R: Runtime> {
    #[educe(Debug(ignore))]
    client: Arc<TorClient<R>>,
    rt: R,
}

impl<R: Runtime> ArtiHttpConnector<R> {
    pub fn new(rt: R, config: &TorClientConfig) -> Self {
        let client = TorClient::with_runtime(rt.clone()).create_unbootstrapped().expect("Error creating Tor Client.");
        client.reconfigure(&config, Reconfigure::AllOrNothing).expect("Error applying config to Tor Client.");
        ArtiHttpConnector { client: Arc::new(client), rt: rt.clone() }
    }
}

impl<R: Runtime> Connector for ArtiHttpConnector<R> {
    fn connect(
        &self,
        details: &ConnectionDetails,
        chained: Option<Box<dyn Transport>>,
    ) -> Result<Option<Box<dyn Transport>>, ureq::Error> {
        if chained.is_some() {
            return Ok(chained);
        }

        let result = self.rt.block_on(async {
            let port = match details.uri.scheme().expect("Could not extract scheme from uri.").as_str() {
                "http" => 80,
                "https" => 443,
                _ => {
                    return Err(ureq::Error::BadUri("Unsupported scheme".to_string()));
                }
            };

            let stream =
            match self.client.connect(
                (details.uri.authority().expect("Could not extract authority from uri.").as_str(), port)
            ).await {
                Ok(stream) => stream,
                Err(e) => {
                    return Err(ureq::Error::Io(std::io::Error::new(
                        std::io::ErrorKind::Other,
                        format!("Error creating stream: {:?}", e),
                    )));
                }
            };

            let (r, w) = stream.split();
            let buffers = LazyBuffers::new(2048, 2048);
            let transport = ArtiHttpConnection {
                r: Arc::new(Mutex::new(r)),
                w: Arc::new(Mutex::new(w)),
                buffer: buffers,
                rt: self.rt.clone(),
            };

            Ok::<_, ureq::Error>(Some(Box::new(transport) as Box<dyn Transport>))
        });

        result
    }
}

fn main() -> Result<(), String> {
    #[allow(non_snake_case)] let URI = "http://httpbin.org/ip";

    let rt = PreferredRuntime::create().expect("Failed to create runtime."); 
    let agent = ureq::Agent::with_parts(
        ureq::config::Config::default(),
        ArtiHttpConnector::new(rt.clone(), &TorClientConfig::default()),
        DefaultResolver::default(),
    );
    
    let mut request = match agent.get(URI).call() {
        Ok(res) => res,
        Err(ureq::Error::StatusCode(code)) => {
            return Err(format!("Error: {:?}", code));
        },
        Err(e) => {
            return Err(format!("Error: {:?}", e));
        }
    };

    if request.status().is_success() {
        let body = request.body_mut().read_to_string().expect("Failed to read body from response.");
        println!("Body: {}", body);
    } else {
        return Err(format!("Error: Request failed with status {}", request.status()));
    }

    Ok(())
}

[package]
name = "arti-ureq-test"
version = "0.1.0"
edition = "2021"

[dependencies]
arti-client = "0.25.0"
tor-proto = { version = "0.25.0", features = ["stream-ctrl", "tokio"] }
tor-rtcompat= {version =  "0.25.0", features = ["async-std"] }
ureq = { version = "3.0.0-rc1", features = ["charset", "gzip"]}
tokio = { version = "1.42.0", features = ["full"] }
educe = "=0.4.23"