How to share `HashMap` between blocking and async functions

Hi!

I'd like to know how I could share HashMap between blocking and async functions.

What I am trying to do is have a long-running background thread that will just delete some items from the hash map. I'm basically creating a Redis clone, so the background thread should remove expired keys. It should start an infinite loop which sleeps and checks for expired keys and deletes them. So, it doesn't neccesarilly have to be tokio::task::spawn_blocking(); it could be std::thread::spawn() instead.

The other runtime is meant as a tokio multi-threaded runtime.

I can't get it to work because the hash map gets moved in the first closure, be it std::thread or tokio::task::spawn_blocking.

This is just the beginning, as I'd have to add the loop and sleeping in it in one of the functions, but I can't get past this issue for now - past the value being moved.

What's the best approach?

Would using channels help and how would I use them? But, isn't Arc supposed to allow us to share the hash map without using a channel?

Perhaps tokio isn't even necessary, but it could be very useful in some other application that may have the same issue. Additionally, I planned to add framing later, through tokio_util::codec::Framed, and this would require tokio.

Many thanks in advance!

use std::collections::HashMap;
use std::sync::Arc;
//use tokio::sync::RwLock;
use std::sync::RwLock;

type WrappedMap = Arc<RwLock<HashMap<String, String>>>;

async fn afn(map: WrappedMap) {
    //let mut s = map.write().await;
    let mut s = map.write().unwrap();
    s.insert("a".to_string(), "A".to_string());
}

async fn bfn(map: WrappedMap) {
    //let mut s = map.write().await;
    let mut s = map.write().unwrap();
    s.insert("b".to_string(), "B".to_string());
}

#[tokio::main]
async fn main() {
    let map = std::collections::HashMap::<String, String>::new();
    let st = Arc::new(RwLock::new(map));

    println!("000");
    // let handle1 = tokio::spawn(afn(st.clone())); // Works!
    let handle1 = tokio::task::spawn_blocking(move || afn(st.clone())); // PROBLEM! variable moved due to use in closure
    println!("111");
        
    println!("222");
    let handle2 = tokio::spawn(bfn(st.clone())); // PROBLEM! value borrowed here after move
    println!("333");
    
    handle1.await.unwrap();
    handle2.await.unwrap();
    println!("444");
    
    //let map = st.read().await;
    let map = st.read().unwrap();
    println!("{:?}", map);
}

(Playground)

you need to make a clone in the parent thread, then move the clone into the closure of the spawned task/thread, but your code moves the Arc into the closure first, then the thread makes a (meaningless) clone once it is scheduled to run:

instead, you should do something like this:

let st2 = st.clone();
let handle1 = tokio::task::spawn_blocking(move || afn(st2));

// alternative method, note the explicit scope
let handle1 = tokio::task::spawn_blocking({
    let st = st.clone();
    move || afn(st))
});
1 Like

@nerditation Thanks for the reply!

I tried both methods, and both give this output:

000
111
222
333
444
{"b": "B"}

So, {"a": "A"} is missing.

oppsie, I overlooked the fact the function afn is actually async. spawn_blocking() is just like thread_spawn(), it doesn't spawn the Future as a tokio task, neither does it poll the Future.

if you need spawn_blocking(), then make the afn() non-async. if you need the async, then just use regular tokio::spawn().

1 Like

The issue is that you are calling an async function inside the sync function passed to spawn_blocking. You should either make afn a sync function or change the spawn_blocking to a spawn like the one for bfn. Also, always look at your code warnings, they aren't there for nothing! Your (fixed) code results in this warning which correctly hints at the issue:

warning: unused implementer of `Future` that must be used
  --> src/main.rs:34:5
   |
34 |     handle1.await.unwrap();
   |     ^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: futures do nothing unless you `.await` or poll them
   = note: `#[warn(unused_must_use)]` on by default
2 Likes

Okay, thanks guys! @nerditation @SkiFire13

I didn't know that would affect the execution; async was there initially because I started with tokio::sync::RwLock which was later replaced with std::sync::RwLock.

For future reference, here is the updated code that works:

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

type WrappedMap = Arc<RwLock<HashMap<String, String>>>;

fn afn(map: WrappedMap) {
    let mut s = map.write().unwrap();
    s.insert("a".to_string(), "A".to_string());
}

async fn bfn(map: WrappedMap) {
    let mut s = map.write().unwrap();
    s.insert("b".to_string(), "B".to_string());
}

#[tokio::main]
async fn main() {
    let map = std::collections::HashMap::<String, String>::new();
    let st = Arc::new(RwLock::new(map));

    let st2 = st.clone();
    let handle1 = tokio::task::spawn_blocking(move || afn(st2));

    let handle2 = tokio::spawn(bfn(st.clone()));
    
    handle1.await.unwrap();
    handle2.await.unwrap();
    println!("444");
    
    let map = st.read().unwrap();
    println!("{:?}", map);
}

Suppose we need tokio::sync::RwLock.

How would we solve that?

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

type WrappedMap = Arc<RwLock<HashMap<String, String>>>;

async fn afn(map: WrappedMap) {
    let mut s = map.write().await;
    s.insert("a".to_string(), "A".to_string());
}

async fn bfn(map: WrappedMap) {
    let mut s = map.write().await;
    s.insert("b".to_string(), "B".to_string());
}

#[tokio::main]
async fn main() {
    let map = std::collections::HashMap::<String, String>::new();
    let st = Arc::new(RwLock::new(map));

    let st2 = st.clone();
    let handle1 = tokio::task::spawn_blocking(move || afn(st2));

    let handle2 = tokio::spawn(bfn(st.clone()));
    
    handle1.await.unwrap();
    handle2.await.unwrap();
    
    let map = st.read().await;
    println!("{:?}", map);
}

This outputs:

{"b": "B"}

You don't want to run async function inside spawn_blocking, it won't be actually executed (and your code emits a warning about unused Future for this exact reason).

1 Like

So, if we want to mix sync and async, we should use std::sync::RwLock and there's no way for us to use tokio::sync::RwLock, correct?

And probably likewise for other async things, right? We can't use them if we want to share them?

most of the time, you can just use the standard synchronizatoin primtives, like Mutex and RwLock.

the async-aware locking types really only make sense if you might have an critical section of extended length (e.g. you use locks in a corse grained way), such that you may need to hold the lock across await points, or you are expecting very high contentions such that the locking step might block for really long period so you want to yield the cpu to other tasks.

a typical use of a fine grained Mutex or RwLock should be within couple of lines of code, and the time spent waiting for the lock is not a concern.

in case you do need to use tokio's locking types, tokio locks do support blocking operations if you don't expect acquiring the lock to be blocked for long, but it's better to use channels for the blocking task to communicate with other async tasks. for example, tokio's channel types do support blocking send and blocking receive.

1 Like

You can use blocking_write to lock it without needing to use async.

That or don't use spawn_blocking as I explained before.

2 Likes

Yes, this works:

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

type WrappedMap = Arc<RwLock<HashMap<String, String>>>;

async fn afn(map: WrappedMap) {
    let mut s = map.write().await;
    s.insert("a".to_string(), "A".to_string());
}

async fn bfn(map: WrappedMap) {
    let mut s = map.write().await;
    s.insert("b".to_string(), "B".to_string());
}

#[tokio::main]
async fn main() {
    let map = std::collections::HashMap::<String, String>::new();
    let st = Arc::new(RwLock::new(map));

    let st2 = st.clone();
    let handle1 = tokio::task::spawn(afn(st2));

    let handle2 = tokio::spawn(bfn(st.clone()));
    
    handle1.await.unwrap();
    handle2.await.unwrap();
    
    let map = st.read().await;
    println!("{:?}", map);
}

It also works with std::sync::RwLock and respective changes to s (let mut s = map.write().unwrap();).

For reference, this is a solution with std::thread::spawn.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

type WrappedMap = Arc<RwLock<HashMap<String, String>>>;

fn afn(map: WrappedMap) {
    let mut s = map.write().unwrap();
    s.insert("a".to_string(), "A".to_string());
}

async fn bfn(map: WrappedMap) {
    let mut s = map.write().unwrap();
    s.insert("b".to_string(), "B".to_string());
}

#[tokio::main]
async fn main() {
    let map = std::collections::HashMap::<String, String>::new();
    let st = Arc::new(RwLock::new(map));

    let st2 = st.clone();
    let handle1 = std::thread::spawn(move || afn(st2));

    let handle2 = tokio::spawn(bfn(st.clone()));
    
    handle1.join().unwrap();
    handle2.await.unwrap();
    
    let map = st.read().unwrap();
    println!("{:?}", map);
}