Hi! This is my first message on this forum so sorry for barging in with a question.
I am new to Rust. I started learning by reading The Book, but after a while I was a little tired of reading theory without coding, so I decided to challenge myself a little. As I come from webdev background, namely Angular, I thought I could implement something akin to rxjs Observable. I have done that before in Typescript and found it to be a fun project, especially when you start diving into details like transform functions. Anyway I managed to implement bare bones of the Observable pattern and tried to test it:
and the compiler complained: "borrowed data escapes outside the closure". I expected that, because the reference of an observable in the create method may point to something that may perish at some point, before the asynchronous task in thread::spawn is executed and therefore I might end up with a dangling pointer. But this left me clueless as to how could I accomplish execution of the "trigger_next" method after I subscribe the created observable.
I deliberately did not post the whole implementation of Observable and Observer to keep the code snippets short. If needed for diagnostic I can definitely provide that.
Especially with incomplete code, please make sure to at least include complete error messages. Execute cargo check in your terminal and get a complete error message from there.
error[E0521]: borrowed data escapes outside of closure
--> src/main.rs:50:9
|
49 | let obs = Observable::<usize>::create(|observable: &Observable::<usize>| {
| ---------- - let's call the lifetime of this reference `'1`
| |
| `observable` is a reference that is only valid in the closure body
50 | / thread::spawn(|| {
51 | | observable.trigger_next(1);
52 | | });
| | ^
| | |
| |__________`observable` escapes the closure body here
| argument requires that `'1` must outlive `'static````
Thank you. Another thing to learn, the actual error message is still missing, the line starting with error at the beginning. Just general advice for understanding Rust errors and asking questions.
So, regarding the problem in question… I believe there’s the general advice to be had that many OOP-patterns simply won’t work all that well in Rust; for example, as far as I’m aware, an observer pattern intrinsically requires shared access to some mutable state. Unless I’m misremembering… I’m not actually all that good at remembering those patterns after all, they always seemed simultaneously trivial and convoluted to me…
Anyways, for further discussion if and how this could be possible in Rust, I believe, it would actually be helpful for you to post the whole code, while in the meantime, I’ll quickly refresh my knowledge on observer patterns
Thanks for your prompt reply! I edited the post that includes the error message, because I realized, after you pointed it out, that I failed to include the whole error message. Here's the rest of the code that comes before main function.
Could you explain how subscribers that were added at a later time should behave? Ignoring the ownership problems and shared mutation that make this hard (though definitely not impossible) to do in Rust in the first place, it appears as if there’s a race condition between whether the spawned observable.trigger_next(1) or the obs.subscribe(observer) would be executed first.
Phew, I'm glad I'm not the only one who feels that way about the so called "Gang of Four" design patterns. It seemed to me that most of it was advice on how to overcome the problems of Object Oriented Programming or maybe C++ itself. Thanks for saying that, I feel better now.
Scrolling through the rxjs docs on Observable it seems that the function passed to create is only executed once .subscribe is called. And possibly multiple times for multiple subscribers? I cannot say I have gained a deep understanding of the type though… I’m used to Rust docs where I can just look at the source code in case something isn’t clear, and where type signatures in documentation are a thing.....
Or maybe it’s that this thing I was reading isn’t actually the documentation but more of a tutorial. https://rxjs.dev/guide/observable
Observable seems to be supposed to be a multi-valued analogon to Promise, which in turn is – as far as I an aware, and in the java-script model of asynchronous programming using callbacks… – is analogous to Future in Rust. So the role of Observable as a multi-valued Promise might be served by Stream, which is somewhat of a multi-valued Future. Of course neither the Stream trait’s APIs nor the implementation are actually all that analogous to Observable as far as I can tell.
If the goal is not some kind of “async” programming, then I would first need to learn of the applications of these so-called “push-based” alternatives in a synchronous / traditional-threading style environment. Perhaps something like a worker thread + a channel that sends back the results is what is wanted?
Streams in Rust can't have multiple consumers though, can they? To me this observer pattern looks a lot like single producer, multiple consumers message passing. Maybe using tokio's broadcast channel would be a good (idiomatic Rust) idea?
This is exactly what I am trying to do. From scratch, because I am trying to learn something. If what I wrote so far is not Rust-feasible, could you maybe point me in a direction of something that would be, so I could learn from it?
Right. On the other hand, I haven’t quite figured out how much – if anything – is actually shared between multiple consumers in the Observable or rxjs. Quoting from the above-linked guide
Observables as generalizations of functions
Contrary to popular claims, Observables are not like EventEmitters nor are they like Promises for multiple values. Observables may act like EventEmitters in some cases, namely when they are multicasted using RxJS Subjects, but usually they don't act like EventEmitters.
Observables are like functions with zero arguments, but generalize those to allow multiple values.
Consider the following:
function foo() {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);
We expect to see as output:
"Hello"
42
"Hello"
42
You can write the same behavior above, but with Observables:
Okay, my interpretation and intended implementation is somewhat different here. You can only consume values emitted after you subscribed to an observable. I could modify my code, of course, to reemit last value whenever subscribe is called.
I don't quite follow this. This looks like when an observer subscribes, it triggers the observable to emit the next value (looks like a workaround for the lack of coroutines/generators in JS world) and dies after it has consumed the next value. I'm wondering what happens when the observable is finished (i.e. is there an alternative to next that says 'this observable has finished')? Nevermind, the tutorial answered my question
Sounds like a good job for a channel; those usually have less callback-ey APIs in Rust. Feel free to look into something like the API of std::sync::mpsc - Rust first; even though it does not do the multiple-consumers-broadcasting thing that you are after, perhaps it’s more useful to start with something straightforward and in the standard library first, and avoid the troubles of learning async rust at the same time; otherwise the abovementioned “broadcast” from the tokio crate is possible closer to what you are after, but with all the caveats. (More complex API, not standard library, involves async Rust.)
Maybe try using the mpsc channel API first, to familiarize yourself with it, and only then consider writing a toy re-implementation of it. The real deal of course involves quite low-level and unsafe code, but a functioning straighforward “toy” re-implementation should be possible by using Arc, Mutex, and Condvar, and a VecDeque of messages (all these types can be found in the standard library); so perhaps an interesting challenge could be to do just that. (Which would also of course require to first learn how to use Arc, Mutex and Condvar.)
Thanks a lot for help! I will definitely look through suggested material. So far rust is both fascinating and a little frustrating to me. On one hand it is a lot of fun to learn something that shows me how much I took as a given when working with other languages, on the other I feel like sometimes Rust is full of workarounds around Rust, if you get me Both feelings are closely interlinked.
Anyway, to be honest I didn't expect anyone to go through a js library documentation just to help me so I am very positively surprised by the community here.
Regarding broadcast-like channels… I just found bus - Rust which apparently is a somewhat frequently downloaded crate that offers non-async broadcast channels. The implementation is probably complex and unsafe (judging by them calling out lock-free-ness), but the API design is something that could be copied in a toy implementation, I suppose. (Though mpsc is probably easier as a first step.)
Also, using the #broadcast keyword on crates.io, I also came across pharos - Rust which is a crate that does also use async for a broadcast-like channel-like API, but also explicitly calls out that it aims implementing an “observer pattern” which further nails down my suspicion that any nice “observer pattern” implementation in Rust should indeed look somewhat different from what you might be used to. It also interacts with the Stream trait I’ve mentioned before, so … well … its API is probably highly non-trivial to understand when you’re unfamiliar with the basics of async Rust, so probably looking into this crate too deeply might not be a good actual learning resource for early on.
Note that mpmc is different from a broadcast in that each message will only reach one of the multiple receivers. (Presumably the one who waited for new messages first, though I'm not certain of the exact logic at play.)