The arti-client has a implementation for hyper (arti-hyper), I tried to do the same for ureq. It is quite difficult because ureq is sync/blocking and arti is async. As a follow-up to this question I think I am coming very very close though.
The following (very messy code) compiles and it does send a request over the Tor-network. The only problem I have is that my response is always \0\0\0\0\0\0\0\0\0\0\...
empty.
I think the problem lays in that reader2.read_to_end(&mut tmp_buffer)
does not actually write to tmp_buffer
.
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
let fake_response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!";
let (tx, rx) = std::sync::mpsc::channel(); // We use channels to send the buffer outside the spawn
let reader = Arc::clone(&self.r); // The reader of our DataStream
let tmp_buffer = Arc::new(Mutex::new(vec![0; 1024]));
std::thread::spawn(move || {
let mut tmp_buffer = tmp_buffer.lock().unwrap();
let mut reader2 = reader.lock().unwrap();
// We need to mess around with Tokio runtimes because Arti is async, but ureq isn't.
let result = tokio::runtime::Runtime::new().unwrap().block_on(async {
// The idea is that the reader outputs the response to tmp_buffer
// For some reason tmp_buffer is ALWAYS \0\0\0\0\0\0\0\0\0...
//match (*reader2).write_all(&mut tmp_buffer).await {
match reader2.read_to_end(&mut tmp_buffer).await {
Ok(_) => {
println!("\n\nRead: {:?}", String::from_utf8_lossy(&tmp_buffer));
},
Err(e) => {
println!("Error reading reader 2: {:?}", e);
}
};
tmp_buffer.clone()
});
println!("Sending buffer outside spawn...");
// Send buffer content which is supposed to contain response to outside spawn.
tx.send(result).unwrap();
});
match rx.recv() {
Ok(mut response_buffer_content) => {
println!("Success: {:?}", String::from_utf8_lossy(&response_buffer_content));
let available_data = response_buffer_content.len();
// buffer.output() should provide a mut handle to write to the buffer
let mut final_buffer = self.buffer.output();
let bytes_to_consume = available_data.min(1024);
final_buffer = &mut response_buffer_content[..bytes_to_consume];
//self.buffer.input_consume(bytes_to_consume); for some reason gives the error assertion failed: self.consumed <= self.filled
Ok(available_data > 0)
},
Err(_) => Ok(false),
}
}
I am getting very close I think. Help would be greatly appreciated. Full code:
use arti_client::{TorClient, TorClientConfig};
use tor_proto::stream::{DataReader, DataWriter};
use std::sync::Arc;
use std::sync::Mutex;
use ureq::unversioned::transport::ConnectionDetails;
use ureq::unversioned::transport::Transport;
use ureq::unversioned::transport::NextTimeout;
use ureq::unversioned::transport::Connector;
use ureq::unversioned::transport::Buffers;
use ureq::unversioned::transport::LazyBuffers;
use ureq::unversioned::resolver::DefaultResolver;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use http::Uri;
use tokio::runtime;
pub struct ArtiHttpConnection {
r: Arc<Mutex<DataReader>>,
w: Arc<Mutex<DataWriter>>,
buffer: LazyBuffers,
}
impl std::fmt::Debug for ArtiHttpConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LazyBuffers")
}
}
unsafe impl Send for ArtiHttpConnection {}
unsafe impl Sync for ArtiHttpConnection {}
impl Transport for ArtiHttpConnection {
fn buffers(&mut self) -> &mut dyn Buffers {
&mut self.buffer
}
fn transmit_output(
&mut self,
amount: usize,
timeout: NextTimeout,
) -> Result<(), ureq::Error> {
let mut writer = match self.w.lock() {
Ok(w) => w,
Err(_) => return Err(ureq::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "Error locking writer"))),
};
let mut buffer = self.buffer.output().to_vec();
println!("WriteA: {:?}", String::from_utf8_lossy(&buffer));
if let Some(pos) = buffer.iter().rposition(|&x| x != 0) {
buffer.truncate(pos + 1); // Keep everything up to the last non-zero byte
} else {
buffer.clear(); // No non-zero bytes found, clear the buffer
}
let result = writer.write_all(&buffer);
println!("WriteB: {:?}", String::from_utf8_lossy(&buffer));
println!("Write: {:?}", result);
Ok(())
}
/*
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
let mut reader = match self.r.lock() {
Ok(r) => r,
Err(_) => return Err(ureq::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "Error locking reader"))),
};
println!("lol4");
let buffer = &mut self.buffer.input().to_vec();
let result = tokio::runtime::Handle::current().block_on(async {
reader.read_to_end(buffer).await
});
match result {
Ok(_) => Ok(buffer.len() > 0),
Err(_) => Ok(false),
}
}*/
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
let fake_response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!";
let (tx, rx) = std::sync::mpsc::channel(); // We use channels to send the buffer outside the spawn
let reader = Arc::clone(&self.r); // The reader of our DataStream
let tmp_buffer = Arc::new(Mutex::new(vec![0; 1024]));
std::thread::spawn(move || {
let mut tmp_buffer = tmp_buffer.lock().unwrap();
let mut reader2 = reader.lock().unwrap();
// We need to mess around with Tokio runtimes because Arti is async, but ureq isn't.
let result = tokio::runtime::Runtime::new().unwrap().block_on(async {
// The idea is that the reader outputs the response to tmp_buffer
// For some reason tmp_buffer is ALWAYS \0\0\0\0\0\0\0\0\0...
//match (*reader2).write_all(&mut tmp_buffer).await {
match reader2.read_to_end(&mut tmp_buffer).await {
Ok(_) => {
println!("\n\nRead: {:?}", String::from_utf8_lossy(&tmp_buffer));
},
Err(e) => {
println!("Error reading reader 2: {:?}", e);
}
};
tmp_buffer.clone()
});
println!("Sending buffer outside spawn...");
// Send buffer content which is supposed to contain response to outside spawn.
tx.send(result).unwrap();
});
match rx.recv() {
Ok(mut response_buffer_content) => {
println!("Success: {:?}", String::from_utf8_lossy(&response_buffer_content));
let available_data = response_buffer_content.len();
// buffer.output() should provide a mut handle to write to the buffer
let mut final_buffer = self.buffer.output();
let bytes_to_consume = available_data.min(1024);
final_buffer = &mut response_buffer_content[..bytes_to_consume];
//self.buffer.input_consume(bytes_to_consume); for some reason gives the error assertion failed: self.consumed <= self.filled
Ok(available_data > 0)
},
Err(_) => Ok(false),
}
}
fn is_open(&mut self) -> bool {
let _r = match self.r.lock() {
Ok(r) => r,
Err(_) => {
println!("Error locking reader 2");
return false;
},
};
(*_r).ctrl().is_open()
}
}
#[derive(Debug)]
pub struct ArtiHttpConnector(());
impl Connector for ArtiHttpConnector {
fn connect(
&self,
details: &ConnectionDetails,
chained: Option<Box<dyn Transport>>,
) -> Result<Option<Box<dyn Transport>>, ureq::Error> {
if chained.is_some() {
return Ok(chained);
}
// Use a Tokio spawn to offload the async code
let (result_tx, result_rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let config = TorClientConfig::default();
let tor_client = match TorClient::create_bootstrapped(config).await {
Ok(client) => client,
Err(e) => {
println!("Error creating Tor client: {:?}", e);
return;
}
};
// Create the stream asynchronously
let stream = match tor_client.connect(("example.com", 80)).await {
Ok(stream) => stream,
Err(e) => {
println!("Error creating stream: {:?}", e);
return;
}
};
let (r, w) = stream.split();
result_tx.send((r, w)).unwrap();
});
let (r, w) = result_rx.recv().unwrap();
let buffers = LazyBuffers::new(2048, 2048);
let transport = ArtiHttpConnection {
r: Arc::new(Mutex::new(r)),
w: Arc::new(Mutex::new(w)),
buffer: buffers,
};
Ok(Some(Box::new(transport)))
}
}
fn main() -> Result<(), String> {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let config = ureq::config::Config::default();
let resolver = DefaultResolver::default();
let connector = ArtiHttpConnector(());
let agent = ureq::Agent::with_parts(config, connector, resolver);
let mut x = match agent.get("http://example.com").call() {
Ok(x) => x,
Err(ureq::Error::StatusCode(code)) => {
println!("Error code: {:?}", code);
return Err("Error".to_string());
},
Err(e) => {
println!("Error: {:?}", e);
return Err("Error".to_string());
}
};
let body = x.body_mut().read_to_string().unwrap();
println!("Body: {}", body);
Ok(())
})
}
/*
#[tokio::main]
async fn main() -> Result<(), String> {
let config = TorClientConfig::default();
// Start the Arti client, and let it bootstrap a connection to the Tor network.
// (This takes a while to gather the necessary directory information.
// It uses cached information when possible.)
let tor_client = TorClient::create_bootstrapped(config).await.unwrap();
let mut stream = tor_client.connect(("google.com", 80)).await.unwrap();
// Write out an HTTP request.
stream
.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n")
.await.unwrap();
// IMPORTANT: Make sure the request was written.
// Arti buffers data, so flushing the buffer is usually required.
stream.flush().await.unwrap();
// Read and print the result.
let mut buf = Vec::new();
stream.read_to_end(&mut buf).await.unwrap();
println!("{}", String::from_utf8_lossy(&buf));
Ok(())
}*/
[package]
name = "arti-ureq-test"
version = "0.1.0"
edition = "2021"
[dependencies]
arti-client = "0.25.0"
tor-proto = { version = "0.25.0", features = ["stream-ctrl", "tokio"] }
tor-rtcompat = "0.25.0"
ureq = { version = "3.0.0-rc1", features = ["charset", "gzip", "rustls", "native-tls", "socks-proxy"]}
tokio = { version = "1.42.0", features = ["full"] }
futures = "0.3.31"
thiserror = "1.0"
tls-api = "0.3"
educe = "=0.4.23"
pin-project = "1.1.8"
http = "1.2.0"