Spawning async methods – Am I doing it right?

Hello, I'm experimenting with porting some synchronous network server code to Tokio. I have defined a trait Server as follows:

#[async_trait]
pub trait Server: Sync + 'static {
    async fn handler<R, W>(&self, reader: R, writer: W) -> Result<(), Box<dyn std::error::Error>>
    where
        R: AsyncRead + Unpin + Send,
        W: AsyncWrite + Unpin + Send;
    /* more methods follow */

I want other methods of the trait spawning a task that calls the handler, such as this:

    async fn other_function(&self) -> Result<(), std::io::Error> {
        /* ... */
        tokio::task::spawn(async move {
            if let Err(err) = self.handler(reader, writer).await {
                eprintln!("Error in handler: {}", err);
            }
        });
    }

This gives an error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement.

I fixed this by passing an Arc<Self> (instead of &self) to that other_function.

Question 1:
Is using Arc<Self> the best way to solve that problem?

My complete code is as follows:

use async_trait::async_trait;
use std::fs;
use std::os::unix::fs::FileTypeExt as _;
use std::path::Path;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};

#[async_trait]
pub trait Server: Sync + 'static {
    async fn handler<R, W>(&self, reader: R, writer: W) -> Result<(), Box<dyn std::error::Error>>
    where
        R: AsyncRead + Unpin + Send,
        W: AsyncWrite + Unpin + Send;
    async fn run_local<P>(self: Arc<Self>, path: P) -> Result<(), std::io::Error>
    where
        P: AsRef<Path> + Send,
    {
        if let Ok(meta) = fs::metadata(&path) {
            if meta.file_type().is_socket() {
                fs::remove_file(&path)?;
            }
        }
        let listener = tokio::net::UnixListener::bind(path)?;
        loop {
            let (conn, _addr) = listener.accept().await?;
            let (reader, writer) = conn.into_split();
            let this = self.clone();
            tokio::task::spawn(async move {
                if let Err(err) = this.handler(reader, writer).await {
                    eprintln!("Error in handler: {}", err);
                }
            });
        }
    }
    async fn run_network<A>(self: Arc<Self>, addrs: A) -> Result<(), std::io::Error>
    where
        A: tokio::net::ToSocketAddrs + Send,
    {
        let listener = tokio::net::TcpListener::bind(addrs).await?;
        loop {
            let (conn, _addr) = listener.accept().await?;
            let (reader, writer) = conn.into_split();
            let this = self.clone();
            tokio::task::spawn(async move {
                if let Err(err) = this.handler(reader, writer).await {
                    eprintln!("Error in handler: {}", err);
                }
            });
        }
    }
}

struct SaySomething {
    greeting: String,
}

#[async_trait]
impl Server for SaySomething {
    async fn handler<R, W>(
        &self,
        _reader: R,
        mut writer: W,
    ) -> Result<(), Box<dyn std::error::Error>>
    where
        R: AsyncRead + Unpin + Send,
        W: AsyncWrite + Unpin + Send,
    {
        writer.write_all(self.greeting.as_ref()).await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = Arc::new(SaySomething {
        greeting: String::from("Hi there!\n"),
    });
    let task1 = tokio::task::spawn(server.clone().run_local("socket"));
    let task2 = tokio::task::spawn(server.clone().run_network("[::]:1234"));
    let (result1, result2) = tokio::join!(task1, task2);
    result1??;
    result2??;
    Ok(())
}

Also note the various Sync and Send requirements, particularly R: AsyncRead + Unpin + Send and W: AsyncWrite + Unpin + Send.

Question 2:
Why do I need to make the reader and writer to be Send in the signature of the handler method? Do futures always have to be Sendable? Or is this some requirement of async_trait?

I get the following error if I remove the + Send:

error: future cannot be sent between threads safely
  --> src/main.rs:67:5
   |
67 | /     {
68 | |         writer.write_all(self.greeting.as_ref()).await?;
69 | |         Ok(())
70 | |     }
   | |_____^ future created by async block is not `Send`
   |
note: captured value is not `Send`
  --> src/main.rs:61:9
   |
61 |         _reader: R,
   |         ^^^^^^^ has type `R` which is not `Send`
   = note: required for the cast to the object type `dyn Future<Output = Result<(), Box<(dyn std::error::Error + 'static)>>> + Send`
help: consider further restricting type parameter `R`
   |
59 |     async fn handler, R: std::marker::Send
   |                     ~~~~~~~~~~~~~~~~~~~~~~

Learning Rust has been quite a challenge for me. Moving to async Rust seems to make things even more complex, and I wonder why. I think the combination of strict lifetime management plus futures might be inherently complex. Perhaps that's the price to pay when you want your code to be abstract, concurrent, and yet efficient. :exploding_head: Or am I overcomplicating things? :thinking:

AFAIK, usage of Arc is a very common approach for solving this kind of problem.

Those kinds of bounds are normal in generic code that works with multiple threads. Also Unpin on Async[Read|Write] is also not uncommon and not exceedingly prohibitive (because things like Pin<Box<R>> always work out to be Unpin even when R: !Unpin), and also most actual implementations of these traits are satisfying Unpin; though an Unpin-free approach is also possible: you’d need to use something like pin_mut! for e.g. the implementation of handler then.

It’s related to async_trait. See its documentation. But note that tokio::task::spawn also needs Send, so it’s quite important that handler returns a future that does fulfill a Send bound in order for the other functions’ default implementations like for run_local to work.

AFAICT your doing quite well, you aren’t even asking how to get something working but only worrying about understanding things better and having idiomatic code here :wink:


By the way, I haven’t read all of your code in-depth and I’m not experienced with practically using async Rust myself.

3 Likes

Okay, then I will not feel bad using it :slight_smile:

I do remember your posts here. :woozy_face: For now, simply requiring Unpin works fine, except it's a bit verbose.

Yeah thanks. The next subsection also explains how to make the methods non-threadsafe by using #[async_trait(?Send)]. I had to actually use this because I need my async method to hold an RwLockReadGuard and that is !Send. However, I ran into the problems that you already pointed out:

The only way out was to use tokio::task::spawn_local, which – in turn – required me to create a tokio::task::LocalSet in my program's main function.

So async works, but I lost multithreading support.

Is it somehow possible to hold an RwLockReadGuard or MutexGuard in an async function or method that gets spawn'ed? :thinking: … Nevermind, while writing this, I just found the answer: Yes. Tokio provides several Send'able synchronization locks/guards in its tokio::sync module. :smiley: That should allow me to make all futures Send again despite using locks/guards. Will try that tomorrow, but wanted to share this, in case any other people have problems with locks and async functions.

tokio::sync::MutexGuard
A handle to a held Mutex. The guard can be held across any .await point as it is Send.

Thanks for helping and encouraging me to keep on!

Please be aware that using the ordinary mutex is also definitely possible in an async fn that gets spawned! The only reason you can't is that you are keeping it locked while performing an .await, but if you don't need that, then the other mutex works fine.

{
    let lock = mutex.lock().unwrap();
    use_lock_without_async(lock).await;
    // lock goes out of scope before .await
}
do_something_async().await;

Note that Tokio's synchronization primitives are different from the standard library's: They don't block when something is locked but instead they have an API using async fn, so that they interact nicely with the async runtime and don't lock up any threads. However, this support comes with some overhead, so it's a good idea not to use them in case you never hold locks for long (i. e. never hold them over an .await point). The fact that one of your futures does loose its Send property does indicate that you might be doing just that: holding a guard over an .await point.

See also:

“Which kind of mutex should you use” in the Tokio docs

In general, if there's need for Sendable guards for a mutex / rw-lock that does block the thread and doesn't have the overhead of supporting async fn locking methods, then using the parking_lot crate could be an option.

2 Likes

Yes, right. In my case, I will need to hold the RwLockReadGuard while .awaiting I/O. Thanks for pointing out that this isn't always the case; particularly not when I need to hold a lock guard only for some (quick) in-memory operations that aren't async.

Right, I didn't think of that, but it makes sense to let the current thread perform some other tasks that are ready to be advanced instead of blocking the thread.

That is also good to know. So I will have to decide in each case which lock type is the right one to be used.

That is an interesting insight which I wasn't aware of when starting to get into async Rust:

Some async fn's are thread-safe, while others are not. You don't always see that immediately in the functions signature as the returned Future is hidden by the syntactic sugar, which can be a bit confusing, I guess. Instead, it depends on whether the function body holds non-Sendable values over .await calls.

Maybe this insight could also be kept in mind for tutorials regarding async Rust (maybe this is already covered).

Cool, I just saw that parking_lot::RwLock also avoids "reader and writer starvation", which std::sync::RwLock does not:

Standard RwLock:

The priority policy of the lock is dependent on the underlying operating system’s implementation, and this type does not guarantee that any particular policy will be used. In particular, a writer which is waiting to acquire the lock in write might or might not block concurrent calls to read […]

I assume std::sync::RwLock is often just a wrapper for pthread_rwlock_*? On my operating system (FreeBSD), the man page for pthread_rwlock_wrlock says:

Implementation Notes:
To prevent writer starvation, writers are favored over readers.

But I guess this may be different on other operating systems.

parking_lot's RwLock:

[…] readers trying to acquire the lock will block even if the lock is unlocked when there are writers waiting to acquire the lock […]

Tokio's RwLock also avoids writer starvation:

Fairness is ensured using a first-in, first-out queue for the tasks awaiting the lock; if a task that wishes to acquire the write lock is at the head of the queue, read locks will not be given out until the write lock has been released. This is in contrast to the Rust standard library’s std::sync::RwLock , where the priority policy is dependent on the operating system’s implementation.

Knowing about parking_lot and Tokio's locks will also allow me to get rid of an extra lock that I only needed to avoid writer starvation.

P.S.: What's also astonishing is that some locks are not thread-safe (as in being !Send) :rofl: That was pretty unexpected for me as you intuitively think that thread-safety is inherent to locks.

I think this is due to restrictions in some OS's API that you have to unlock a mutex from the same thread as you lock it from. This makes the lock guard of a mutex unable to be sent between threads.

2 Likes

Some of these topics are covered in this chapter of the Tokio tutorial. I am also working on a much more in-depth article on how and when to use which locks.

1 Like

Threads owning the mutex is a POSIX thing (see pthread_mutex_unlock).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.