Help with Mutex on fst

Hi there,

New to rust, and I'm trying to implement a project using fst crate. I want to flush every set time interval so I was trying to use Mutex to take a lock. Here is the code I have so far,

use chrono::Duration;
use fst::{Map, MapBuilder};
use futures::{SinkExt, StreamExt};
use std::fs::File;
use std::io::{BufWriter, Cursor, Write};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::thread::Thread;
use std::{io, thread};

pub struct TestFST {
    pub _type: String,
    //pub map: Arc<Mutex<Vec<String>>>
    pub map: MapBuilder<BufWriter<File>>,
}

impl TestFST {
    fn new(_type: String) -> Self {
        let mut wtr = io::BufWriter::new(File::create("map.fst").unwrap());
        Self {
            _type,
            map: MapBuilder::new(wtr).unwrap(),
        }
    }

    pub fn init(_type: String, time: u32) -> Self {
        TestFST::new(_type);
    }

}

fn main() {
    let fst = Arc::new(Mutex::new(TestFST::init("test".parse().unwrap(), 5)));

    let mut fst_clone = Arc::clone(&fst);

    // thread the mimics a stream of data to be inserted into fst
    thread::spawn(move || loop {
        let mut f = fst.lock().unwrap();
        (*f).map.insert("somedata", 1u64);
    });

    // finish the fst after 5 secs
    thread::spawn(move || {
        loop {
            thread::sleep(tokio::time::Duration::from_secs(5 as u64));
            let mut f = fst_clone.lock().unwrap();
            (*f).map.finish();
            // after calling finish current fst is immutable. a new fst needs to be created for the insert thread to write to.
            // maybe use a AtomicBoolean to see if the fst is writeable and create a new fst in the writer thread if it's not.
        }
    });
}

error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, TestFST>`
   --> src/main.rs:161:13
    |
161 |             (*f).map.finish();
    |             ^^^^^^^^ move occurs because value has type `fst::MapBuilder<std::vec::Vec<u8>>`, which does not implement the `Copy` trait

Any pointers? Thank you

Welcome to the users.rust-lang.org forum!

Your code formatting is terrible (use rustfmt please), it contains unbalanced parentheses, misses fn main() and use statements, has inconsistent naming of TestFST vs TestFst and new vs init (with different parameters), and you clearly haven’t read the pinned post about syntax highlighting. What your code should have looked like:

use fst::MapBuilder;
use std::sync::{Arc, Mutex};
use std::fs::File;
use std::io::BufWriter;
use std::thread;

pub struct TestFst {
    pub _type: String,
    pub map: MapBuilder<BufWriter<File>>,
}

impl TestFst {
    pub fn new(_type: String) -> Self {
        let wtr = BufWriter::new(File::create("map.fst").unwrap());
        Self {
            _type,
            map: MapBuilder::new(wtr).unwrap(),
        }
    }
}

fn main() {
    let fst = Arc::new(Mutex::new(TestFst::new("test".parse().unwrap())));

    // Thread to flush every 5 secs
    thread::spawn(move || {
        let fst_clone = Arc::clone(&fst);
        loop {
            thread::sleep(tokio::time::Duration::from_secs(5 as u64));
            let f = fst_clone.lock().unwrap();
            (*f).map.finish();
        }
    });
}

and your error message is not in a code block at all and misses the first line:

error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, TestFst>`
  --> src\main.rs:31:13
   |
31 |             (*f).map.finish();
   |             ^^^^^^^^ move occurs because value has type `fst::MapBuilder<std::io::BufWriter<std::fs::File>>`, which does not implement the `Copy` trait
2 Likes

As to answer your actual question, MapBuilder::finish is only meant to be called once to finish the construction of the MapBuilder. If you pay attention to its signature

impl<W: Write> MapBuilder<W> {
    pub fn finish(self) -> Result<()>
}

the self argument without & or &mut indicates that it consumes the MapBuilder it is called on.

I’m not entirely sure why you want to flush the file every 5 seconds, but it still can be done by accessing the underlying BufWriter, though not trivially since unfortunately MapBuilder’s API doesn’t provide mutable access to the underlying writer. This can be fixed with a wrapper using RefCell though.

use delegate::delegate;
use fst::MapBuilder;
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::sync::{Arc, Mutex};
use std::thread;

pub struct TestFst {
    _type: String,
    map: MapBuilder<RefCellWriter<BufWriter<File>>>,
}

impl TestFst {
    pub fn new(_type: String) -> Self {
        let wtr = RefCellWriter(RefCell::new(BufWriter::new(
            File::create("map.fst").unwrap(),
        )));
        Self {
            _type,
            map: MapBuilder::new(wtr).unwrap(),
        }
    }
}

struct RefCellWriter<W>(RefCell<W>);
impl<W: Write> Write for RefCellWriter<W> {
    delegate! {
        to self.0.get_mut() {
            fn write(&mut self, buf: &[u8]) -> io::Result<usize>;
            fn flush(&mut self) -> io::Result<()>;
        }
    }
}

fn main() {
    let fst = Arc::new(Mutex::new(TestFst::new("test".parse().unwrap())));

    // Thread to flush every 5 secs
    thread::spawn(move || {
        let fst_clone = Arc::clone(&fst);
        loop {
            thread::sleep(tokio::time::Duration::from_secs(5 as u64));
            let f = fst_clone.lock().unwrap();
            f.map.get_ref().0.borrow_mut().flush().unwrap();
        }
    });
}

For the record, my dependencies in Cargo.toml:

[dependencies]
fst = "0.4.4"
tokio = {version = "0.2.22", features = ["time"]}
delegate = "0.4.3"

Also note that the way you use Arc doesn’t work. You currently are moving fst to the other thread, and then cloning it there (for no reason). If you want to keep a copy of the Arc in the main thread, you need to clone first and then move the clone:

    let fst = Arc::new(Mutex::new(TestFst::new("test".parse().unwrap())));

    let fst_clone = Arc::clone(&fst);
    // Thread to flush every 5 secs
    thread::spawn(move || loop {
        // use fst_clone here
    });

@steffahn Thank you! I've updated my post with the formatting.
Yes, the finish is to be only called once and it makes the fst immutable. I want to build the fst after 5 secs and my writer thread will need to figure out that the fst is built and creates a new one and write to that.
I'm planning on having an AtomicBool in my TestFst which will be set to false if the fst is not writeable (i.e if finish is called on it) and the writer thread will use this flag to create a new TestFst and write to it.
I will try using the RefCell approach

I see. In that case, you should use an Option.

use fst::MapBuilder;
use std::fs::File;
use std::io::BufWriter;
use std::sync::{Arc, Mutex};
use std::thread;

pub struct TestFst {
    _type: String,
    map: MapBuilder<BufWriter<File>>,
}

impl TestFst {
    pub fn new(_type: String) -> Self {
        let wtr = BufWriter::new(
            File::create("map.fst").unwrap(),
        );
        Self {
            _type,
            map: MapBuilder::new(wtr).unwrap(),
        }
    }
}

fn main() {
    let fst = Arc::new(Mutex::new(Some(TestFst::new("test".into()))));

    let fst_clone = Arc::clone(&fst);
    // Thread to flush every 5 secs
    thread::spawn(move || loop {
        thread::sleep(tokio::time::Duration::from_secs(5 as u64));
        let mut f = fst_clone.lock().unwrap();
        if let Some(test_fst) = f.take() {
            test_fst.map.finish().unwrap();
        }
    });

    // in writer thread:
    {
        let mut f = fst.lock().unwrap();
        if f.is_none() {
            *f = Some(TestFst::new("foo".into()));
        }
        f.as_mut().unwrap().map.insert([1,2,3,4u8], 42).unwrap();
    }
}

See above in the main how that could work.

Ah, That's great. Quick question though. Why does map.finish() work with Option, where as without Option it throws ```cannot move out of dereference` exception as above?

The important detail here is Option::take(). The test_fst variable then is not a reference to a TestFst but the TestFst value itself. No need for any dereferencing the map field. Instead the test_fst is getting destructured, i.e. split up into its individual fields. Any fields that remain at the end of the

if let Some(test_fst) = f.take() {
    test_fst.map.finish().unwrap();
    // test_fst._type implicitly dropped here
}

are dropped (as indicated above).