Hi, I have a BigOldDB struct and I want client (libs) to be able to stream results from it. I want to use a channel that BigOldDB populates and the client reads from, to avoid blocking and/or loading a whole bunch of items in memory (ignore BigOldDB already has them in memory ;-)).
I naively want to do:
struct BigOldDB {
items: Vec<String>,
}
impl BigOldDB {
fn feed_me_refs(&self) -> tokio::sync::mpsc::Receiver<&String> {
let (tx, rx) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
for s in &self.items {
tx.send(s).await.unwrap();
}
});
rx
}
fn feed_me_clones(&self) -> tokio::sync::mpsc::Receiver<String> {
let (tx, rx) = tokio::sync::mpsc::channel(10);
tokio::spawn(async move {
for s in &self.items {
tx.send(s.clone()).await.unwrap();
}
});
rx
}
}
Obviously this can't work because the spawned task is "detached" from the &self reference.
I really want to say to Rust: "I know the spawned task references &self but I promise &self will live long enough". I can't say it is 'static as it isn't, but the code will only ever do something like:
let mut rx = big_old_db.feed_me();
while let Some(s) = rx.recv().await {
// ...
}
I've worked around this by having BigOldDB accept a Sender and the client spawns the task.
I could also achieve the same thing by passing in a call back, but that isn't as flexible.
This (passing in the Sender) works, but is this necessary? Is this the only way to achieve this?
The thing is, it doesn't necessarily live long enough. If the async task containing this code (not the spawned sending task) is cancelled (or if the loop panics) and big_old_db is owned by that task, then big_old_db may be dropped while the spawned sending task is still using the reference to it.
In order for your program to be sound, you have to write code that doesn’t access freed memory even when other things are going wrong.
if the DB is immutable, then you can simply use shared ownership of the data:
use std::sync::Arc;
struct BigOldDB {
items: Arc<[String]>,
}
...
fn feed_me_clones(&self) -> tokio::sync::mpsc::Receiver<String> {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let items = self.items.clone();
tokio::spawn(async move {
for s in items {
...
If the DB’s data can be mutated then you have a much harder problem.
Then the feed_me_clones() simply keeps iterating over the old version of the data, which still exists until it is no longer needed. Of course, this means that mutate() is O(N) because it will have to copy all the existing items (that aren't deleted), but persistent data structures can provide more efficient ways of getting the same result. (Unfortunately, I don't have any recommendations for persistent data structure libraries in Rust.)
Another approach is to use some form of interior mutability in your database. Depending on where you put it and what kind you use, this will mean that the reader might see the change applied halfway through reading, or the writer might have to wait until the reader has finished.
+1 for persistent data structures. Moving from Java to Clojure was a revelation. Moving from Clojure to Rust was a different revelation.
The thing I'm finding is that Rust's abstractions are nice, but they tend to be at a lower level than some other languages, which is fair, it's Rust :-). Finding the line between "perfectly sensible design" which is supported and those that aren't unsupported by Rust's abstractions is a long journey. Fun though