Tokio Implementation VERY DIFFICULT

I'm encountering a significant issue implementing Tokio concurrency for this method

pub fn find_key(&self, db: &str, table_name: &str, key: &str) -> Result<Option<Key>,
  FmError> {
    if let Ok(Some(entry)) = self.memory.find_key(&FsApi::format_path(&[db, table_name]).to_string_lossy().to_string(), key) {
      return Ok(Some(Key {
        status: KeyStatus::Found, data: (key.to_string(), entry), table_name: table_name.to_string(), database: db.to_string(), file: String::new()}));
    }
    let mut path = FsApi::format_path(&[db, table_name, "level0"]);
    let files = Self::read_level0(db, table_name)?;
    for file_path in &files {
      path.push(file_path);
      let offset = self.read_sstable_offset(&path)?;
      let bloomfilter = self.read_sstable_bloom_filter(&path, offset.clone())?;
      if bloomfilter.check(&key.to_string()) == true {
        let prim_idx = self.read_prim_idx(&path, offset.clone())?;
        if let Some(index) = prim_idx.iter().position(|(string, _)| string == key) {
          let mut tombstone_path = path.parent().unwrap().join("tombstones");
          tombstone_path.push(path.file_name().unwrap());
          tombstone_path.set_extension("db.tombstone");
          if let Some(action) = Tombstone::has_key(&Tombstone::parse(FsApi::read(&tombstone_path)?)?, &String::from("b06d6a5566454a463676a001e117bf5716cd3bb9cf1c3af8f9b59d7b97ec8836")) {
            if action.action == Actions::DEL {
              return Ok(None);
            } else if action.action == Actions::UPD {
              return Ok(Some(Key {
                status: KeyStatus::Updated,
                data: (String::from(key), action.payload.unwrap()), table_name: table_name.to_string(), database: db.to_string(), file: file_path.clone()}
              ));
            }
          }
          let (_, (s, e)) = prim_idx[index];
          let entry = self.read_entry(&path, offset, s, e)?;
          return Ok(Some(Key {
            status: KeyStatus::Found,
            data: entry, table_name: table_name.to_string(), database: db.to_string(), file: file_path.clone()}
          ));
        } else {
          path.pop();
          continue;
        }
      } else {
        path.pop();
        continue;
      }
    }
    return Ok(None);
  }

Despite consulting numerous resources, including AI assistance, thoroughly studying the Tokio documentation, I'm still struggling to get it right. I'm unsure of the root cause of the problem.

I would greatly appreciate any assistance you can offer.

You can find the entire file here: The full codespace.

I intended to create a thread for each file to handle them concurrently. Once a key is found, all threads should be closed, and the key returned.

Thank YOU for Helping,
John

You haven't described the problem with the code that you need to solve -- what is it doing incorrectly, what errors are you getting, etc? Please post the details including full error messages (if any).

3 Likes

Use a JoinSet to spawn a task for each file, then return set.join_next().await

1 Like

This is problem i donot know where to start to implement tokio for this method

This looks like pure computation to me (no I/O). Tokio is meant primarily for I/O. For parallel computation I suggest you look at rayon.

Some general advice:

  • Make sure you understand what the sequential code does first. Writing concurrent code is in general much more difficult, and quite impossible if you don't fully grasp the sequential version.
  • The function seems to be doing too many things at once. Split it up into many small functions and name them clearly. This makes the code much easier to understand. It will also make opportunities for parallelisation more obvious.
  • Use clippy and work on the code until it is happy. It should give you advice on how to improve expressions like if boolean_value == true.
6 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.