Getting and sending data to a Lock free thread

I have the aim to send to and receive from a thread data.
Both should be lockfree for that thread.
And if some values change faster then the lockfree thread can deal with them they should be dropped.
I came up with the idea to send a hashmap through a channel of length 1 to the thread.
The channel of length 1 blocks the sending so that the hashmap is filled with multiple values or overwrites older states and only sends them when the old map has been processed.
I choose the HashMap because the values that can be changed (length of the data vector) is much bigger then the amount of changes.
To get values from the thread i use Vec because I don't know who wants to access them when but generally i would like to have the cheapest possible lockfree solution.

Does this makes sense or do i overestimate the costs of atomics?
How expensive is it to copy a value to an Atomic compared to sending things through a channel?
Is this waitfree?

use std::{
    collections::HashMap,
    sync::{
        atomic::{AtomicI32, Ordering},
        mpsc::{sync_channel, TrySendError},
        Arc,
    },
    thread::{sleep, spawn},
    time::Duration,
};

use rand::{seq::SliceRandom, thread_rng};

fn main() {
    let out_messages: Arc<Vec<AtomicI32>> = vec![AtomicI32::new(0), AtomicI32::new(0)].into();
    let out_messages2 = out_messages.clone();
    let (is, ir) = sync_channel::<HashMap<usize, i32>>(1);
    // slow thread
    let h1 = spawn(move || {
        let mut in_messages: HashMap<usize, i32> = HashMap::new();
        for i in 0..10 {
            let val = out_messages[0].load(Ordering::Relaxed);
            dbg!(val);

            {
                in_messages = if let Err(TrySendError::Full(re)) = is.try_send(in_messages) {
                    re
                } else {
                    HashMap::new()
                };
                in_messages.insert(1, i)
            };
            sleep(Duration::from_millis(10));
        }
    });
    //lockfree thread
    let h2 = spawn(move || {
        let mut dspdata = vec![0, 0]; // very long vector
        for _ in 0..100 {
            if let Ok(map) = ir.try_recv() {
                println!("new map {map:?}");
                for (key, val) in map.iter() {
                    dspdata[*key] = *val;
                }
            }
            // dsp section
            // simulate buisyness
            sleep(Duration::from_millis(
                *[1, 20].choose(&mut thread_rng()).unwrap(),
            ));
            dspdata[0] = dspdata[1];
            //
            out_messages2[0].store(dspdata[0], Ordering::Relaxed)
        }
    });
    h1.join().unwrap();
    h2.join().unwrap();
}

I am not an expert in the detailed performance considerations you're asking about, but here are some things I noticed about your code's management of data structures:

  1. Currently, your DSP thread is responsible for dropping the HashMaps it receives, i.e. calling the memory allocator to deallocate them. You should probably instead use a second channel to transmit the HashMaps back to other threads, which can then be responsible for dropping or reusing them.

  2. HashMap isn't a good data structure for this job, because the only thing you do on receipt is iterate over it. A better data structure would be Vec<(usize, i32)> — key-value pairs without supporting lookups. That will be more efficient to iterate over, and cheaper to create too if you don't need key de-duplication.

  3. But you can do even better and avoid allocations entirely: instead of a Vec, either make the channel elements (usize, i32)s themselves, or use tinyvec::ArrayVec or a similar fixed-capacity data structure. This way the DSP thread is never given any heap allocations at all. Additionally, the fixed capacity is a virtue, because you don't want your DSP thread to hiccup if it is somehow given a large batch of changes at once — making everything fixed capacity ensures that the DSP thread will do a certain maximum amount of processing channel data per iteration.

2 Likes

I endup with the hashmap idea because of the deduplication. (To ignore duplicated set messages of the same "key" if it is send faster then it can be processed, the result should be a similar behavior as if one would read a vector of atomics)
...
And thanks for the idea that i should be able to find a data structure that is cheaper to iterate for the lockfree thread.
...
And the reminder to use a allocation pool, to avoid deallocation.

By the "cost of atomics" do you mean the cost to communicate a large number of values between threads using a large vec of AtomicI32? That can be expensive because the memory bus can become saturated due to synchronization between CPU caches.

Optimized channels use atomics, so they are still synchronized between CPUs, but there are only two atomics for the two ends of the queue. SPSC queues can perform even less atomic operations.

Nothing is really wait-free since since there can be waits at the hardware level.

But I'm not sure which two approaches you're trying to compare. The approach you came up with vs using a large vec of atomics, or vs sending one item at a time over a channel?

Edit: Also, the cost of using atomics depends on the ordering. Relaxed ordering is much less expensive since the hardware is free to synchronize more gradually.

1 Like

You can collect the messages in a HashMap, then convert it to something else for sending to the DSP thread. (This would also be a convenient way to keep reusing one HashMap rather than allocating a new one for each batch.)

1 Like

By the "cost of atomics" do you mean the cost to communicate a large number of values between threads using a large vec of AtomicI32? That can be expensive because the memory bus can become saturated due to synchronization between CPU caches.

yes i mean that problem. That "the memory bus can become saturated" is difficult to estimate.

But I'm not sure which two approaches you're trying to compare. The approach you came up with vs using a large vec of atomics, or vs sending one item at a time over a channel?

I think I understand now better my problem. I forgot one more thing in the equation, that is, if (or if not) i need the output values within the lockfree dsp thread. When I need to read the values multiple times a vector of atomics wouldn't be good. I would anyway need to copy the values to get them out of the thread. So a vector through a channel is clearly better.
A bit more unclear is it when I the output values are only written and not reused. then a vector of atomics could be convenient and shouldn't be to bad (i hope). I am talking about a maximum of [AtomicI32;1000].

I have no experience with using such a large a number of atomic variables, but I would be wary of it. I prefer to use queues or mutexes/rwlocks. My impression is that the memory bus is already overworked with normal buffer copying, etc, at least in network and database apps. But maybe I am too cautious, because whether the memory bus becomes the bottleneck depends on many factors. A good approach is to pick a solution, but expect that you will have to change it based on performance testing.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.