Output from ureq not written to buffer

The arti-client has a implementation for hyper (arti-hyper), I tried to do the same for ureq. It is quite difficult because ureq is sync/blocking and arti is async. As a follow-up to this question I think I am coming very very close though.

The following (very messy code) compiles and it does send a request over the Tor-network. The only problem I have is that my response is always \0\0\0\0\0\0\0\0\0\0\... empty.

I think the problem lays in that reader2.read_to_end(&mut tmp_buffer) does not actually write to tmp_buffer.

    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {

        let fake_response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!";
        
        
        let (tx, rx) = std::sync::mpsc::channel();  // We use channels to send the buffer outside the spawn
        let reader = Arc::clone(&self.r);           // The reader of our DataStream
       
        
        let tmp_buffer = Arc::new(Mutex::new(vec![0; 1024]));
        std::thread::spawn(move || {
            let mut tmp_buffer = tmp_buffer.lock().unwrap();
            let mut reader2 = reader.lock().unwrap();
            
            // We need to mess around with Tokio runtimes because Arti is async, but ureq isn't.
            let result = tokio::runtime::Runtime::new().unwrap().block_on(async {

                // The idea is that the reader outputs the response to tmp_buffer
                // For some reason tmp_buffer is ALWAYS \0\0\0\0\0\0\0\0\0...
                //match (*reader2).write_all(&mut tmp_buffer).await {
                match reader2.read_to_end(&mut tmp_buffer).await {
                    Ok(_) => {
                        println!("\n\nRead: {:?}", String::from_utf8_lossy(&tmp_buffer));
                    },
                    Err(e) => {
                        println!("Error reading reader 2: {:?}", e);
                    }
                };
                tmp_buffer.clone()
            });

            println!("Sending buffer outside spawn...");

            // Send buffer content which is supposed to contain response to outside spawn.
            tx.send(result).unwrap();
        });

        match rx.recv() {
            Ok(mut response_buffer_content) => {
                println!("Success: {:?}", String::from_utf8_lossy(&response_buffer_content));

                let available_data = response_buffer_content.len();
                // buffer.output() should provide a mut handle to write to the buffer
                let mut final_buffer = self.buffer.output();
                let bytes_to_consume = available_data.min(1024);

                final_buffer = &mut response_buffer_content[..bytes_to_consume];
                //self.buffer.input_consume(bytes_to_consume); for some reason gives the error assertion failed: self.consumed <= self.filled

                Ok(available_data > 0)
            },
            Err(_) => Ok(false), 
        }
    }

I am getting very close I think. Help would be greatly appreciated. Full code:

use arti_client::{TorClient, TorClientConfig};
use tor_proto::stream::{DataReader, DataWriter};
use std::sync::Arc;
use std::sync::Mutex;
use ureq::unversioned::transport::ConnectionDetails;
use ureq::unversioned::transport::Transport;
use ureq::unversioned::transport::NextTimeout;
use ureq::unversioned::transport::Connector;
use ureq::unversioned::transport::Buffers;
use ureq::unversioned::transport::LazyBuffers;
use ureq::unversioned::resolver::DefaultResolver;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use http::Uri;
use tokio::runtime;

pub struct ArtiHttpConnection {
    r: Arc<Mutex<DataReader>>,
    w: Arc<Mutex<DataWriter>>,
    buffer: LazyBuffers,
}

impl std::fmt::Debug for ArtiHttpConnection {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "LazyBuffers")
    }
}

unsafe impl Send for ArtiHttpConnection {}
unsafe impl Sync for ArtiHttpConnection {}

impl Transport for ArtiHttpConnection {
    fn buffers(&mut self) -> &mut dyn Buffers {
        &mut self.buffer
    }

    fn transmit_output(
        &mut self,
        amount: usize,
        timeout: NextTimeout,
    ) -> Result<(), ureq::Error> {
    
        let mut writer = match self.w.lock() {
            Ok(w) => w,
            Err(_) => return Err(ureq::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "Error locking writer"))),
        };
        let mut buffer = self.buffer.output().to_vec();
        println!("WriteA: {:?}", String::from_utf8_lossy(&buffer));
         if let Some(pos) = buffer.iter().rposition(|&x| x != 0) {
            buffer.truncate(pos + 1); // Keep everything up to the last non-zero byte
        } else {
            buffer.clear(); // No non-zero bytes found, clear the buffer
        }
        let result = writer.write_all(&buffer);
        println!("WriteB: {:?}", String::from_utf8_lossy(&buffer));
        println!("Write: {:?}", result);

        
        Ok(())
    }

    /*
    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
        let mut reader = match self.r.lock() {
            Ok(r) => r,
            Err(_) => return Err(ureq::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "Error locking reader"))),
        };
        println!("lol4");
        let buffer = &mut self.buffer.input().to_vec();
        let result = tokio::runtime::Handle::current().block_on(async {
            reader.read_to_end(buffer).await
        });

        match result {
            Ok(_) => Ok(buffer.len() > 0),
            Err(_) => Ok(false),
        }
    }*/

    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {

        let fake_response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!";
        
        
        let (tx, rx) = std::sync::mpsc::channel();  // We use channels to send the buffer outside the spawn
        let reader = Arc::clone(&self.r);           // The reader of our DataStream
       
        
        let tmp_buffer = Arc::new(Mutex::new(vec![0; 1024]));
        std::thread::spawn(move || {
            let mut tmp_buffer = tmp_buffer.lock().unwrap();
            let mut reader2 = reader.lock().unwrap();
            
            // We need to mess around with Tokio runtimes because Arti is async, but ureq isn't.
            let result = tokio::runtime::Runtime::new().unwrap().block_on(async {

                // The idea is that the reader outputs the response to tmp_buffer
                // For some reason tmp_buffer is ALWAYS \0\0\0\0\0\0\0\0\0...
                //match (*reader2).write_all(&mut tmp_buffer).await {
                match reader2.read_to_end(&mut tmp_buffer).await {
                    Ok(_) => {
                        println!("\n\nRead: {:?}", String::from_utf8_lossy(&tmp_buffer));
                    },
                    Err(e) => {
                        println!("Error reading reader 2: {:?}", e);
                    }
                };
                tmp_buffer.clone()
            });

            println!("Sending buffer outside spawn...");

            // Send buffer content which is supposed to contain response to outside spawn.
            tx.send(result).unwrap();
        });

        match rx.recv() {
            Ok(mut response_buffer_content) => {
                println!("Success: {:?}", String::from_utf8_lossy(&response_buffer_content));

                let available_data = response_buffer_content.len();
                // buffer.output() should provide a mut handle to write to the buffer
                let mut final_buffer = self.buffer.output();
                let bytes_to_consume = available_data.min(1024);

                final_buffer = &mut response_buffer_content[..bytes_to_consume];
                //self.buffer.input_consume(bytes_to_consume); for some reason gives the error assertion failed: self.consumed <= self.filled

                Ok(available_data > 0)
            },
            Err(_) => Ok(false), 
        }
    }

    fn is_open(&mut self) -> bool {
        let _r = match self.r.lock() {
            Ok(r) => r,
            Err(_) => {
                println!("Error locking reader 2");
                return false;
            },
        };
        (*_r).ctrl().is_open()
    }
}

#[derive(Debug)]
pub struct ArtiHttpConnector(());

impl Connector for ArtiHttpConnector {
    fn connect(
        &self,
        details: &ConnectionDetails,
        chained: Option<Box<dyn Transport>>,
    ) -> Result<Option<Box<dyn Transport>>, ureq::Error> {
        if chained.is_some() {
            return Ok(chained);
        }

        // Use a Tokio spawn to offload the async code
        let (result_tx, result_rx) = std::sync::mpsc::channel();

        tokio::spawn(async move {
            let config = TorClientConfig::default();
            let tor_client = match TorClient::create_bootstrapped(config).await {
                Ok(client) => client,
                Err(e) => {
                    println!("Error creating Tor client: {:?}", e);
                    return;
                }
            };

            // Create the stream asynchronously
            let stream = match tor_client.connect(("example.com", 80)).await {
                Ok(stream) => stream,
                Err(e) => {
                    println!("Error creating stream: {:?}", e);
                    return;
                }
            };


            let (r, w) = stream.split();

            result_tx.send((r, w)).unwrap();
        });

        let (r, w) = result_rx.recv().unwrap();

        let buffers = LazyBuffers::new(2048, 2048);
        let transport = ArtiHttpConnection {
            r: Arc::new(Mutex::new(r)),
            w: Arc::new(Mutex::new(w)),
            buffer: buffers,
        };

        Ok(Some(Box::new(transport)))
    }
}


fn main() -> Result<(), String> {
    let rt = tokio::runtime::Runtime::new().unwrap();

    rt.block_on(async {
        let config = ureq::config::Config::default();
        let resolver = DefaultResolver::default();

        let connector = ArtiHttpConnector(());

        let agent = ureq::Agent::with_parts(config, connector, resolver);
        let mut x = match agent.get("http://example.com").call() {
            Ok(x) => x,
            Err(ureq::Error::StatusCode(code)) => {
                println!("Error code: {:?}", code);
                return Err("Error".to_string());
            },
            Err(e) => {
                println!("Error: {:?}", e);
                return Err("Error".to_string());
            }
        };

        let body = x.body_mut().read_to_string().unwrap();
        println!("Body: {}", body);

        Ok(())
    })
}

/*
#[tokio::main]
async fn main() -> Result<(), String> {

    let config = TorClientConfig::default();

// Start the Arti client, and let it bootstrap a connection to the Tor network.
// (This takes a while to gather the necessary directory information.
// It uses cached information when possible.)
let tor_client = TorClient::create_bootstrapped(config).await.unwrap();

let mut stream = tor_client.connect(("google.com", 80)).await.unwrap();


// Write out an HTTP request.
stream
    .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n")
    .await.unwrap();

// IMPORTANT: Make sure the request was written.
// Arti buffers data, so flushing the buffer is usually required.
stream.flush().await.unwrap();

// Read and print the result.
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.unwrap();

println!("{}", String::from_utf8_lossy(&buf));

    Ok(())

}*/

[package]
name = "arti-ureq-test"
version = "0.1.0"
edition = "2021"

[dependencies]
arti-client = "0.25.0"
tor-proto = { version = "0.25.0", features = ["stream-ctrl", "tokio"] }
tor-rtcompat = "0.25.0"
ureq = { version = "3.0.0-rc1", features = ["charset", "gzip", "rustls", "native-tls", "socks-proxy"]}
tokio = { version = "1.42.0", features = ["full"] }
futures = "0.3.31"
thiserror = "1.0"
tls-api = "0.3"
educe = "=0.4.23"
pin-project = "1.1.8"
http = "1.2.0"

A few problems I noticed:

You're using a blocking operation in async code. That's not okay. Use the Tokio mpsc channel instead.

Ouch. This creates a new multi threaded runtime, so on 8 core machine, you're spawning 9 threads to do a single thing. You probably should be using the original runtime instead. You can do so by obtaining a tokio::runtime::Handle and moving it into a new thread.


Other than that, there's a lot going on that I don't really understand. You're doing blocking operations in async code, and the one place where you spawn a dedicated thread, it looks like the actual operation is async? I don't get it.

1 Like

Alright, thanks for the tips, maybe those are already the issue.

It is not strange you don't understand the code because it is not logically at all. Maybe this ticket on the Arti-project explain my intentions more: Integration or example with ureq (#1519) · Issues · The Tor Project / Core / Arti · GitLab

They want to have the possibility to integrate Arti in popular HTTP-libraries like in this case ureq or hyper like already done. Only since ureq 3.x it is possible to integrate custom Transporters in ureq.

The difficulty is, is that the trait's in ureq are sync, so the implementation needs to be sync as well. But because Arti only has async functions it is difficult to implement those in the methods required by the sync ureq trait.

The await_input actually should only be something like this (pseudo). The only reason it gets so complex is because await_input must be implemented as blocking but Arti is non-blocking and uses await.

fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
   let buffer = vec![0; 1024];
   reader2.read_to_end(&mut tmp_buffer).await     // can't use await

   let mut final_buffer = self.buffer.output();
   final_buffer = tmp_buffer;
}

Are you able to clarify whether ureq is calling arti methods, or whether arti is calling ureq methods?

Ureq calls Arti methods. This is ureq's trait: Transport in ureq::transport - Rust

The idea was to implement that trait with methods who call Arti methods to complete a request.

When done successfully, the custom transport will provide Arti-buffers for ureq to use.

In that case I recommend providing a synchronous interface to arti similar to the mini-redis one you can find here:

1 Like

Thanks for the suggestion. Would I use Tokio runtime or the runtime from tor_rtcompat::Runtime; like in Arti hyper? lib.rs - source