Using SQLite asynchronously

Hello! I am trying to use sqlite3 in my application. I want the code to use asynchronous. But I found out that there aren't any SQLite drivers in rust that support async. Are there any suggestions as to how I can work my way around this problem?
BTW, I did find a suggestion online

"move sqlite into its own thread and provide an async interface to it via mpsc"

But I am not sure how to achieve this.

Thanks.

I use tokio::task::block_in_place around sqlite code. It does require async code using it already be spawned on a tokio threaded runtime, which is a bit annoying.

Because sqlite "natively" has no concept of async, you can't do any better than existing suggestions of mpsc, thread pools, etc.

1 Like

Thanks, @kornel. I took a look at tokio::task::block_in_place at https://docs.rs/tokio/0.2.13/tokio/task/fn.block_in_place.html. There I observed that there is another function for the similar purpose, which I believe doesn't need us to create a threaded runtime explicitly (I may be wrong) - tokio::task::spawn_blocking.

EDIT: Okay, both do require threaded runtime

Something like this:

type Job<T> = Box<dyn FnOnce(&mut Option<T>) + Send>;
        let (sender, receiver) = channel::<Job<T>>();
        
        let join_handle = thread::spawn(move || {
            let mut obj = match constructor() {
                Ok(x) => Some(x),
                Err(err) => {
                    error!("Initialiazion cause error: {}", err);
                    None
                }
            };
            loop {
                match receiver.recv() {
                    Ok(x) => x(&mut obj),
                    Err(err) => {
                        trace!("thread pool({}) recv error: {}", name, err);
                        break;
                    }
                }
            }
        });

So you can create sqlite handle inside thread, and then send via channel FnOnce(&mut Option<rusqlite::Connection>

It may be not good if you spawn many queries.
The number of tokio threads is limited.

Thanks @Dushistov.
I've a couple doubts regarding the code. constructor() - is this the constructor for the sqlite connection? And, is it possible to get the data back from the other thread? Do I have to create another channel for this flow of data in the opposite direction (from the spawned thread to original thread)? Or is it somehow possible to use the same channel? (I believe the channel only allows data flow in one direction).

I am not worried about that. I only have a couple queries every 10-20 ms.

As I understand it, sqlite is a file-based protocol, and generally the various OSes provide no async interface for file IO like they do for network IO, so using the various techniques for running blocking code are the best you can do.

Generally spawn_blocking is recommended over block_in_place because the latter can cause issues with anything else running concurrently in the same task, e.g. in the other half of a join. That said block_in_place is a bit more efficient as it runs the blocking code on the same thread, and it can help avoid some lifetime challenges.

I don't think I would go for spawning a thread and using message passing. The two Tokio methods for blocking code are good enough.

1 Like

Tokio has two kinds of threads in the thread-pool: Core threads and blocking threads. By default there is one core thread for each CPU, and up to 500 blocking threads. The block_in_place function makes the current thread change category into the blocking category temporarily, allowing the Tokio runtime to spawn another core thread to handle things while the blocking code runs.

The spawn_blocking function sends the task to an existing thread in the blocking category.

2 Likes

Yes.

This logic is hidden inside FnOnce.
Something like

pub struct Executor<T> {
    inner: Sender<Job<T>>,
}

struct AsyncSqlite { 
        executor: thread_pool::Executor<rusqlite::Connection>,
}
impl AsyncSqlite {
    pub fn spawn<F: FnOnce(&mut Option<T>) + Send + 'static>(&self, job: F) {
        self.inner
            .send(Box::new(job))
            .expect("thread_pool::Executor::spawn failed");
    }
}

and when you want to run

let (send, recv) = channel();
async_sqlite.spawn(move |&sqlite| {
   let tran = sqlite.transaction();
   tran.commit();
   sender.send(result);
});
recv.await;

This is another problem. 500 is too big, for something like desktop application.
sqlite is not really multi-threading, this 500 threads will block each other. And if some sqlite query "stuck" for second or so, because of fsync take too long, then tokio will spawn threads by threads and eats desktop memory. It is better just one dedicated thread for sqlite, and channel with tasks for it,
it saves a lot of memory.

I mean, it can make sense to have a single thread for sqlite operations, especially if you would otherwise need a mutex around the connection. I would probably not go for a design where the user provides the channel, and instead provide a function you can directly await the output of.

    let threaded_rt = runtime::Builder::new()
        .threaded_scheduler()
        .build().expect("failed to create multithreaded runtime");

    threaded_rt.spawn(async {
        let x = tokio::task::spawn_blocking(move || {
            "hello"
        }).await.expect("error in spawned task");
        println!("{:?}", x);
    });

When I try to run this, I get

thread 'tokio-runtime-worker' panicked at 'error in spawned task: JoinError::Cancelled', src/libcore/result.rs:1188:5

How can I solve this problem?

Your threaded_rt probably went out of scope right after the spawn, dropping it and thus cancelling all tasks. Keep in mind that you want an enable_all call in that builder.

You might want to use block_on instead of spawn.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.