I solved it like this
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
let mut buffer = vec![0u8; 1024]; // You can adjust the buffer size
tokio::task::block_in_place(move || {
let waker = futures::task::noop_waker_ref();
let mut cx = Context::from_waker(waker);
let mut buf = ReadBuf::new(&mut buffer);
// Polling the DataReader
let mut reader = ArtiHttpConnection::poll_read(Pin::new(self), &mut cx, &mut buf);
match reader {
std::task::Poll::Ready(Ok(_)) => {
Ok(buf.filled().len() > 0)
},
std::task::Poll::Ready(Err(_)) => Err(ureq::Error::StatusCode(1)),
_ => Ok(false),
}
})
}
I am actually just trying to do for ureq with Arti what arti-hyper
did for Hyper. It is interesting but it is going way over my head. I think I am totally missing the point.
Full code:
use arti_client::{TorClient, TorClientConfig};
use tor_proto::stream::{DataReader, DataStream, DataWriter};
use std::sync::Arc;
use ureq::unversioned::transport::Transport;
use ureq::unversioned::transport::NextTimeout;
use ureq::unversioned::transport::Buffers;
use tor_rtcompat::Runtime;
use educe::Educe;
use tls_api::TlsConnector as TlsConn;
use std::pin::Pin;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::task::{Context, Poll};
#[derive(Educe)]
#[educe(Clone)] // #[derive(Debug)] infers an unwanted bound TC: Clone
pub struct ArtiHttpConnector<R: Runtime, TC: TlsConn> {
/// The client
client: TorClient<R>,
/// TLS for using across Tor.
tls_conn: Arc<TC>,
}
impl<R: Runtime, TC: TlsConn> ArtiHttpConnector<R, TC> {
/// Make a new `ArtiHttpConnector` using an Arti `TorClient` object.
pub fn new(client: TorClient<R>, tls_conn: TC) -> Self {
let tls_conn = tls_conn.into();
Self { client, tls_conn }
}
}
#[pin_project]
#[derive(Debug)]
pub struct ArtiHttpConnection {
/// The stream
#[pin]
inner: Pin<Box<DataStream>>,
}
impl AsyncRead for ArtiHttpConnection {
fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &mut ReadBuf) -> std::task::Poll<std::io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}
impl AsyncWrite for ArtiHttpConnection {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
self.project().inner.as_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().inner.as_mut().poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().inner.as_mut().poll_shutdown(cx)
}
}
unsafe impl Send for ArtiHttpConnection {}
unsafe impl Sync for ArtiHttpConnection {}
impl Transport for ArtiHttpConnection {
fn is_open(&mut self) -> bool {
self.inner.ctrl().is_open()
}
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, ureq::Error> {
let mut buffer = vec![0u8; 1024]; // You can adjust the buffer size
tokio::task::block_in_place(move || {
let waker = futures::task::noop_waker_ref();
let mut cx = Context::from_waker(waker);
let mut buf = ReadBuf::new(&mut buffer);
// Polling the DataReader
let mut reader = ArtiHttpConnection::poll_read(Pin::new(self), &mut cx, &mut buf);
match reader {
std::task::Poll::Ready(Ok(_)) => {
Ok(buf.filled().len() > 0)
},
std::task::Poll::Ready(Err(_)) => Err(ureq::Error::StatusCode(1)),
_ => Ok(false),
}
})
}
fn buffers(&mut self) -> &mut dyn Buffers {
unimplemented!()
}
fn transmit_output(&mut self, _size: usize, _timeout: NextTimeout) -> Result<(), ureq::Error> {
unimplemented!()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TorClientConfig::default();
let tor_client = Arc::new(TorClient::create_bootstrapped(config).await?);
Ok(())
}