What is the idiomatic way to insert into a shared set?

i have this piece of code which is shared among multiple tasks

    async fn _push_to_set(set: Arc<Mutex<HashSet<String>>>, file: String) -> Result<()> {
        trace!("Loading file: {}", file);

        let file = File::open(file).await?;
        let mut reader = BufReader::new(file).lines();

        for line in reader.next_line().await? {
            let mut set = set.lock().await;
            set.insert(line.trim().to_string());
        }

        Ok(())
    }

the files are line +1 Million line in every file, i feel that acquiring the lock for every insert comes with a great overhead, is there a better way to insert into a shared set?

There are two main approaches:

  1. Each task produces its own HashSet and sends them to a task that merges them.
  2. Cache several items and insert them in bulk.

How can merge them? and does merging require going through the set one by one?

You can use extend().

let mut merged = HashSet::new();

merged.extend(map1);
merged.extend(map2);
merged.extend(map3);

@alice would it be faster if i create a ram disk and insert the files in it, and go through them synchronously?

You could also consider a concurrent set, like dashmap::DastSet or flurry::HashSet.
(edit: removed im/im_rc as those still need &mut self for insert.)

1 Like

File IO is not really possible without blocking IO in any case, so async does not gain you much. For this reason, reading the files synchronously in a spawn_blocking call is a fine way to do it. As for the ram disk, it will speed up either approach simply because ram disks allow faster access to the file.

@cuviper As I understood the purpose, the set is created once and then used, in which case I would not recommend a concurrent set. To be fair, this assessment was aided by having also answered this thread.

1 Like

for extend its not clear to me how it work from its source, does it call insert internally (e.g. looping through the items of a set) ?

Yeah, it's kinda obfuscated by a bunch of generics, but it's basically a call to reserve followed by a loop of insert.

1 Like

@alice what do you think about this solution?

  • create a ram disk file
  • put the new files in it
  • go through them one by one
  • delete the file when its processing finished

for now i am using a python script to process the files, which is really really slow (~30 minutes) i am trying to replicate the code using rust so i can reduce the time and process the files efficiently.

is the procedure on top better, or should i go with extend?

I don't know enough about your use case to answer that.

basically all this server does is send all the actions that occur during a period of time to another server, every file contain a list of actions that happen during a period of time (action per line) my goal is to get a unique list of actions that happen and send them to another server.

the files are sent using scp (i don't have control on it) to my server and are placed in a directory (which i can change)

So the difference would be whether the Rust executable is executed once per file or not?

what i have now is a cron that run every hour, load the pyhton script that processes the files that are sent to me. the script is loaded and iterate throught the files inside the directory and process them one by one create a set of unique actions save them to a file and send them to another server.

this process takes a lot of time to finish, and sometimes when files are too big, it takes more than 1 hour, which causes two scripts to run.. (and this causes issues) the files sizes vary from 10M to 1G and the number of files is not a constant.

in my current executable (rust) i want to remove the cron, and let rust wait for files to come and process them

Sure that makes sense. It sounds like the number of files is reasonably small, in which case I'd probably go for just spawning a thread, since the bulk of your work appears to be either file IO or CPU bound. If you need to process at most one file at a time, you can even avoid the whole thread business.

1 Like

@alice thank you :smiling_face_with_three_hearts:

What I suggest you do is to use something like inotify to spawn a process on each file created that will extract the events and send them to a

UNIX_DATAGRAM

In parallel, you have some form of daemon / background process that listens on that Unix socket to collect the events, and then outputs them wherever you need to output.

But the key idea is that one of the most efficient ways to have a form of mpsc with multiple processes in Unix is to use a UNIX_DATAGRAM.

1 Like

@Yandros if i understand you correctly

server sending the files -> inotify -> spawn a process that will send bytes to a UNIX_DGRAM -> app listening on UNIX_DGRAM (process and send to next server)

sorry @Yandros i may miss-understand your idea, but how can adding a DGRAM socket between inotify and the application improve performance?

What i endup doing is

  • a loop that has a delay_for and runs in a tmux window
  • when new files comes, iter through them and spawn a task to handle each files
  • when tasks are finished collect the results and send them to the next recipient

if DGRAM is better, can you please explain to me why?

Because, if I have understood correctly, you need to do some "merge" over the data spread across the multiple files. If it isn't the case / if you can handle each file separately and send the right data accordingly, then the inotify layer will indeed suffice :slightly_smiling_face:

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.