How to spawn an async job using raw pointers

For some reason I must use an raw pointer of a Tokio TcpStream.

I write some like this:

use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() {
    let addr: std::net::SocketAddr = "127.0.0.1:7911".parse().unwrap();
    let socket = tokio::net::TcpStream::connect(addr).await.unwrap();
    let pointer: *mut tokio::net::TcpStream = &mut socket;
    tokio::spawn(async move {
        pointer.as_mut().unwrap().write(&[3,4,5]).await;
    });
}

Then rust will hint that

error: future cannot be sent between threads safely
   --> src/main.rs:25:5
    |
25  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /Users/iami/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/tokio-1.11.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl Future`, the trait `Send` is not implemented for `*mut tokio::net::TcpStream`
note: captured value is not `Send`
   --> src/main.rs:26:9
    |
26  |         pointer.as_mut().unwrap().write(&[3,4,5]).await;
    |         ^^^^^^^ has type `*mut tokio::net::TcpStream` which is not `Send`

error: aborting due to previous error

After searching, I find I need a wrapper with Send/Sync.
Sure.

use tokio::io::AsyncWriteExt;

#[derive(Copy, Clone)]
pub struct Socket {
    pub pointer: *mut tokio::net::TcpStream,
}

unsafe impl Send for Socket {}
unsafe impl Sync for Socket {}

impl Socket {
    pub async fn write(&mut self, content: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
        unsafe { 
            self.pointer.as_mut().unwrap().write_all(content).await?; // Some check skipped.
        };
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let addr: std::net::SocketAddr = "127.0.0.1:7911".parse().unwrap();
    let socket = tokio::net::TcpStream::connect(addr).await.unwrap();
    let mut socket = Socket { pointer: &mut socket };
    tokio::spawn(async move {
        socket.write(&[3,4,5]).await;
    });
}

That code don't compile with

note: future is not `Send` as this value is used across an await
   --> src/main.rs:14:13
    |
14  |             self.pointer.as_mut().unwrap().write_all(content).await?;
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `self.pointer` maybe used later...

How should I fix this?

socket gets dropped before the spawned task runs, so even if rustc were to accept it, it is still UB.

Yes. I just show some simple code example here, I can promise it still alive that time.

Anyway, it's already unsafe right?

Try

impl Socket {
    pub fn write(&mut self, content: &[u8]) -> impl Future<Result<(), Box<dyn std::error::Error>>> {
        let socket = unsafe { 
            self.pointer.as_mut().unwrap()
        };
        socket.write_all(content).await?; // Some check skipped.
        Ok(())
    }
}

When it comes to async tasks, it happens rather often that people who claim this end up being wrong about their promise. The reason for this is that async tasks can be cancelled. For example, web servers will typically cancel the task handling a request if the connection is closed in the meantime. Cancellation also happens when you return from the main function and use #[tokio::main].

2 Likes

I might be wrong, and do correct me - those of you, who know much more about this than I do, as I prefer to steer clear of unsafe features whenever I can - but given your use of a raw pointer you might want to consider adding some PhantomData to make sure you keep a proper track of your reference.

Not sure why you'd want to create an *mut field instead of a regular &mut one in the first place, though. Are you sure you want to complicate things for yourself here? Because there's an easier way.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.