One client per thread

Hi, I have a program that uses tokio and its task. Each of this tasks is using by creating from scratch "a client", to be precise, BinaryCopyInWriter. I'd like to design that app so that the BinaryWriter is created only if it doesn't yet been created for that thread in which a particular task is executing. Is it possible at all?
Thank you

I guess a combination of thread_local! and OnceCell should do the trick.

Unfortunetaly I don't think so. I've tried that approach, but in order to create BinaryCopyInWriter one has to execute some async stuff and this is show stopper for thread_local! AFAICT.

You can create a new runtime in the callback you pass to OnceCell::get_or_init and block it till it has finished initializing the BinaryCopyInWriter (I assume it is this one?)


Edit: There is also an asynchronous version of OnceCell on crates.io.

I am not much of an async programmer but I tried this out. Please correct me if I've made mistakes.

I tried two different approaches, one using the async OnceCell and once just using an Option. See the comments for details.

In any case there is no way to avoid using the computed value in a callback, since thread locals don't support borrowing. And I assumed that the callback needs a mut ref, since otherwise an Arc could be shared and all this wouldn't be needed.

// Assume we have a struct that is expensive to construct.
struct Expensive {}

// This uses `async_once_cell::OnceCell` and `block_on`.
//
// A `RefCell` wraps the value to allow mutable access.
//
// Note that `block_in_place` and `block_on` are called whether or not
// the value has already been computed.
//
fn with_expensive_blocking<R>(
    mut f: impl FnMut(&mut Expensive) -> R,
) -> R {
    use async_once_cell::OnceCell;
    use core::cell::RefCell;
    use tokio::runtime::Handle;

    thread_local! {
        static VAL: OnceCell<RefCell<Expensive>> = OnceCell::new();
    }
    VAL.with(|cell| {
        tokio::task::block_in_place(|| {
            let val = Handle::current().block_on(
                cell.get_or_init(async { RefCell::new(Expensive {}) }),
            );
            f(&mut *val.borrow_mut())
        })
    })
}

// This does not use a `OnceCell` or call `block_on` and is therefore
// what I recommend using.
//
// A `RefCell<Option<...>>` wraps the value to allow mutable access and
// to implement lazy computation here.
//
// This uses a fast path when the value has already been computed.
//
// Because this does not call `block_on`, it assumes that the thread
// may change during the async computation, or another task might
// update the value in the original thread. If a value is present in
// the current thread after the computation, the newly computed value
// is simply dropped; this should be rare.
//
async fn with_expensive_not_blocking<R>(
    mut f: impl FnMut(&mut Expensive) -> R,
) -> R {
    use core::cell::RefCell;

    thread_local! {
        static VAL: RefCell<Option<Expensive>> = RefCell::new(None);
    }
    if let Some(r) = VAL.with_borrow_mut(|val| match val {
        Some(r) => Some(f(r)),
        None => None,
    }) {
        return r;
    }

    let r = async { Expensive {} }.await;

    VAL.with_borrow_mut(|val| match val {
        Some(r) => f(r),
        opt @ None => {
            *opt = Some(r);
            f(opt.as_mut().unwrap())
        }
    })
}

Edit: fixed comment for first approach.

Edit #2: added block_in_place call in the first approach, thanks to help from @jofas. Updated comments.

1 Like

Thanks for reminding me why I avoid using thread locals. Forgot about that. Well, this makes the async OnceCell superfluous, as you could just block inside the synchronous OnceCell's get_or_init call. However, I definitely prefer your second approach with Option.


you probably want to wrap this in a call in a block_in_place or spawn_blocking, to avoid Handle::block_on to panic, because it is called from the executor itself:

fn panics() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        tokio::task::spawn(async {
            tokio::runtime::Handle::current().block_on(async {
                println!("this will panic before it can print this message.");
            })
        })
        .await
        .unwrap();
    });
}

fn works() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current().block_on(async {
                println!("works.");
            })
        })
    });
}

fn main() {
    works();
    panics();
}

Playground.

1 Like

:slight_smile:

Thank you! I'm obviously not familiar with blocking like this, and I don't want to leave an example that will mislead someone.

I tried wrapping with spawn_blocking because the doc indicates it works better with both single and multi threaded runtimes. But this required a Send bound on the final result R, which is definitely undesirable, so I scrapped that and used block_in_place. But I also see in the doc that block_in_place doesn't work well with the single threaded runtime, although it did work with the single threaded runtime in my simple test.

So now I'm thinking I should just remove the blocking example, since the other approach with Option is less problematic. Do you agree?


I went ahead and added the call to block_in_place for the first approach, and updated the comments to direct readers toward the second approach with Option.

I think that, given that OP wants to have an instance of BinaryCopyInWriter in every thread, we can safely deduce they are using the multithreaded runtime. In that case I don't believe an example with block_in_place would be out of place, even though it should panic when called from a single threaded runtime.

However, I don't believe neither the OnceCell, nor the Option solutions are actually helpful to OP, as the callback is not asynchronous, and OP needs to call asynchronous methods on BinaryCopyInWriter. We can't spawn tasks that aren't Send onto the runtime, so the thread local approach itself doesn't seem suitable to me any more after OP disclosed that they need to asynchronously interact with the thread local variable and you reminded me that we can't just reference the thread local variable.

1 Like

Oh, you're right, I didn't notice that the methods on BinaryCopyInWriter are async. I'm going to try out the thread_local crate for this. I'd like to try it out anyway since the std thread_local is so inconvenient. I'll post back here.


I'm afraid it doesn't makes sense to store an object that, 1) must be Send and 2) is accessed mutably, in a thread local of any kind. BinaryCopyInWriter is Send because calling its async methods may move it to a different thread, and its methods take a self: Pin<&mut Self>. The ThreadLocal::get, etc, methods return a shared ref, and more importantly the thread is the owner of the object.

This is not what the OP needs, but note that for an object accessed immutably it's easy to use ThreadLocal from the thread_local crate. You can borrow objects (immutably), and if they're Send you can call their async methods. ThreadLocal itself, representing a pool of objects of a particular type, is Send/Sync so you can easily share it among threads/tasks.

1 Like

Thank you guys for all the input. So, I'm bit confused after reading the answers. Is the scenario I've described doable or not? :slight_smile:

I don't think one BinaryCopyInWriter per thread is semantically what you want in the first place. The moment you call .await in one of your tasks, it can get rescheduled to a different thread, with a different BinaryCopyInWriter. I don't believe that will result in very sensible output.

Hi,
I certainly get what you're saying but from the "architectural" point of view it (my OP) does make sense.
Each thread uses always same BinaryCopyInWriter without the need to creating it more than once. We store that object in thread_local.
If the .await did in fact reschedule to different thread we would use BinaryCopyInWriter "from that thread"/"belonging to that thread".

From which thread? The current thread or the thread that the task ran on before being rescheduled? The whole point of thread locals is that they can't be accessed by a different thread, so using the local BinaryCopyInWriter from the previous thread wouldn't make sense:

And I believe using the BinaryCopyInWriter from the current thread, after the task previously wrote some data on another thread's BinaryCopyInWriter, would result in broken output.

The current thread. Using the BinaryCopyInWriter local to that thread.

But that's not what will happen when you call an async method on BinaryCopyInWriter. There will be only one BinaryCopyInWriter for that call, the one from the original thread. Which doesn't work when the thread can change, since the original thread owns the object.

If you want to avoid recreating the BinaryCopyInWriter every time you need one, you can create a pool of them and take one from the pool when needed. If the pool is empty, you create one as usual and then add it to the pool when you're finished using it.

Step too far. I want to call async when I already have that BinaryCopyInWriter, not when I'm creating it.
let writer = BinaryCopyInWriter::new(sink, ...);//<<No async here
use_writer(writer,&batch).await; //<<async here but here I wanted to take that "tls writer"

If you want to avoid recreating the BinaryCopyInWriter every time you need one, you can create a pool of them and take one from the pool when needed

That genuinely seems like most straightforward idea. Thank you