Custom transport for ureq

As noted in this issue comment ureq 3.x allows the whole transport to be replaceable. I am trying to find a way to make ureq communicate over Tor (without using Socks).

use ureq;
use arti_client::{TorClient, TorClientConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TorClientConfig::default();
    let tor_client = TorClient::create_bootstrapped(config).await?;

    // Somehow use the tor_client in a ureq request?

    let body: String = ureq::get("http://127.0.0.1:1234")
        .call()?
        .into_string()?;
    println!("{}", body);

    Ok(())
}

Here is an example of how it is done for hyper. I guessed I had to implement Transport for a TorClient wrapper, but ureq's Transport just seems to be for error messages? Transport in ureq - Rust

How would I write a custom transport to use the tor client in ureq? I am NOT asking for a full implementation. Just pointers and explanation to the basics of HTTP transporters to implement this.

Presumably you'd need to look at Transport in version 3, not 2. I really don't know anything beyond that, sorry.

By the way though, did you know ureq is blocking? Asking since your example looks like it wants async.

1 Like

Aaaah that clears a lot of things up. So this is ureq's trait:

pub trait Transport:
    Debug
    + Send
    + Sync {
    // Required methods
    fn buffers(&mut self) -> &mut dyn Buffers;
    fn transmit_output(
        &mut self,
        amount: usize,
        timeout: NextTimeout,
    ) -> Result<(), Error>;
    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, Error>;
    fn is_open(&mut self) -> bool;

    // Provided method
    fn is_tls(&self) -> bool { ... }
}

So what I'd basically do is make a struct like

struct ArtiTransport {
   stream: DataStream
}

with DataStream being TorClient's datastream after connecting.

Then I'd do

impl Transport for ArtiTransport {
   // Implement all methods of ureq's Transport trait like buffers, is_open,...
}

For example, looking at the code for DataStream we see it has a property ctrl (DataStreamCtrl) which has a function is_open.

So one of the implemented functions could look like this?

impl Transport for ArtiTransport {
   fn is_open(&mut self) -> bool {
       self.stream.ctrl().is_open() // is_open function from Arti implemented for this trait to be used in ureq
   }
}

For the other methods required by the trait like is_tls I'd just check on the TorClient's stream if it's TLS and return the correct value,...

Is this how I'd do it?

It's okay to be blocking and sync by the way. My initial code was just a scribble.

You can't use ureq inside async code. It doesn't use .await for its blocking operations. See more here:

1 Like

I solved it like this

    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
        let mut buffer = vec![0u8; 1024]; // You can adjust the buffer size
        tokio::task::block_in_place(move || {
            let waker = futures::task::noop_waker_ref();
            let mut cx = Context::from_waker(waker);
            let mut buf = ReadBuf::new(&mut buffer);

            // Polling the DataReader
            let mut reader = ArtiHttpConnection::poll_read(Pin::new(self), &mut cx, &mut buf);
            match reader {
                std::task::Poll::Ready(Ok(_)) => {
                    Ok(buf.filled().len() > 0)
                },
                std::task::Poll::Ready(Err(_)) => Err(ureq::Error::StatusCode(1)),
                _ => Ok(false),
            }
        })
    }

I am actually just trying to do for ureq with Arti what arti-hyper did for Hyper. It is interesting but it is going way over my head. I think I am totally missing the point.

Full code:

use arti_client::{TorClient, TorClientConfig};
use tor_proto::stream::{DataReader, DataStream, DataWriter};
use std::sync::Arc;
use ureq::unversioned::transport::Transport;
use ureq::unversioned::transport::NextTimeout;
use ureq::unversioned::transport::Buffers;
use tor_rtcompat::Runtime;
use educe::Educe;
use tls_api::TlsConnector as TlsConn;
use std::pin::Pin;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::task::{Context, Poll};

#[derive(Educe)]
#[educe(Clone)] // #[derive(Debug)] infers an unwanted bound TC: Clone
pub struct ArtiHttpConnector<R: Runtime, TC: TlsConn> {
    /// The client
    client: TorClient<R>,

    /// TLS for using across Tor.
    tls_conn: Arc<TC>,
}

impl<R: Runtime, TC: TlsConn> ArtiHttpConnector<R, TC> {
    /// Make a new `ArtiHttpConnector` using an Arti `TorClient` object.
    pub fn new(client: TorClient<R>, tls_conn: TC) -> Self {
        let tls_conn = tls_conn.into();
        Self { client, tls_conn }
    }
}

#[pin_project]
#[derive(Debug)]
pub struct ArtiHttpConnection {
    /// The stream
    #[pin]
    inner: Pin<Box<DataStream>>,
}

impl AsyncRead for ArtiHttpConnection {
    fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &mut ReadBuf) -> std::task::Poll<std::io::Result<()>> {
        self.project().inner.poll_read(cx, buf)
    }
}

impl AsyncWrite for ArtiHttpConnection {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        self.project().inner.as_mut().poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
        self.project().inner.as_mut().poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
        self.project().inner.as_mut().poll_shutdown(cx)
    }
}

unsafe impl Send for ArtiHttpConnection {}
unsafe impl Sync for ArtiHttpConnection {}


impl Transport for ArtiHttpConnection {
    fn is_open(&mut self) -> bool {
        self.inner.ctrl().is_open()
    }

    fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
        let mut buffer = vec![0u8; 1024]; // You can adjust the buffer size
        tokio::task::block_in_place(move || {
            let waker = futures::task::noop_waker_ref();
            let mut cx = Context::from_waker(waker);
            let mut buf = ReadBuf::new(&mut buffer);

            // Polling the DataReader
            let mut reader = ArtiHttpConnection::poll_read(Pin::new(self), &mut cx, &mut buf);
            match reader {
                std::task::Poll::Ready(Ok(_)) => {
                    Ok(buf.filled().len() > 0)
                },
                std::task::Poll::Ready(Err(_)) => Err(ureq::Error::StatusCode(1)),
                _ => Ok(false),
            }
        })
    }

     fn buffers(&mut self) -> &mut dyn Buffers {
        unimplemented!()
    }

    fn transmit_output(&mut self, _size: usize, _timeout: NextTimeout) -> Result<(), ureq::Error> {
        unimplemented!()
    }
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TorClientConfig::default();
    let tor_client = Arc::new(TorClient::create_bootstrapped(config).await?);

    Ok(())
}