Does Rust need Rx implementation (and/or more)?

The map method is for lifting a single Signal.

If you want to lift multiple Signals (which is equivalent to CombineLatest in Rx), you can use map_ref:

map_ref! {
    let a = foo.some_method(),
    let b = bar.some_other_method(),
    let c = qux.last_method() => {
        *a + *b + *c
    }
}

You don't need to keep quoting Rx. I know it.

There's already dozens of operators for Futures/Streams/Signals, and more are being added all the time: StreamExt, TryStreamExt, FutureExt, TryFutureExt, SignalExt, SignalVecExt

If your favorite combinator isn't available yet, you're welcome to implement it yourself and send a pull request. Everything is open source, and created by volunteers.

For debounce specifically, you can use debounce with Streams and Signals. Rx Observable is like a mishmash of Stream + Signal, so everything you can do with Rx you can do with Streams / Signals.

This is misunderstanding how the system works. You can have parallel Futures / Streams / Signals which all run on the same Executor. In fact, you'll usually have only one Executor per program.

You can think of the Executor as being like a thread pool which randomly assigns Futures / Streams / Signals to different threads.

There is no copying, because op2() will consume op1(), so it has exclusive access to it. And so Rust's compiler is happy, the Rust memory model is happy, and everything has 0 cost.

If you want to split a Stream/Signal so it has multiple consumers, you can of course do that, but now you have to use something like a Broadcaster (which internally uses an Arc<Mutex<Vec<Weak<BroadcasterStatus>>>>).

The key thing is that most of the time you only need to split the roots, not intermediate Streams / Signals, so you only pay the performance cost for splitting when you need it.

That means you only have 1 allocation for each root, which is far more efficient than multiple allocations per node (which is what you would get with Rx's model).

It's out of date, but this general principle is explained well here.

I'm not sure why you're doubting whether it exists or not: Futures / Streams / Signals have existed for years, and have been used in real programs for years.

The Future / Stream code is here, and the Signals code in here.

Though if you really didn't want to reinvent the wheel, the best thing would be to contribute to the already large ecosystem, rather than inventing a new system.

There seems to be multiple misunderstandings here. First, stack allocated things can be passed between threads.

Second, any sort of Stream/Signals/Observable/whatever system isn't going to be stack allocated. Instead, all the data will live inside structs, and those structs will ideally be allocated exactly once (on the heap). That's exactly what happens with Futures/Streams/Signals.

Third, Futures/Streams/Signals can run on different threads simultaneously and communicate with each other (using Rust's standard multi-threading tools). If they couldn't, then they would be a lot less useful.

2 Likes

@Pauan actually streams looks nice.
I think there is a PR issue. Why didn't I hear about pin/unpin? This is a hidden little treasure.
I see. It is hidden in futures :slight_smile:

1 Like

It's hidden because it's an implementation detail which users don't need to worry about.

The reason for Pin is to support self-referential structs, which is needed for async/await.

The Future / Stream system itself has been heavily advertised for years.

@jflorte

I've been looking at implementation of zip in stream ( zip.rs.html -- source ). There are no vectors (arrays) like in RxJS ( RxJS/zip.js at master Ā· Reactive-Extensions/RxJS Ā· GitHub ). Rust is doing polling! It is not push-based!

To your question of why there is no push-based thingy. Types, and a need for those pinned things, as @Pauan pointed out, to keep promise of a speedy implementation. May now is a ripe time.

@Pauan
Of course stream is a different thing. For example, if no one polls, there are no events to debounce!
Stream's zip can't possibly have two events from stream waiting for an event on the other to zip with.
Wow!
Rx's zip can easily get pile up in zip, cause events are pushed from an upstream.
Zip is an example of how execution models are different.

In Rx unsubscription on error is important, cause it stops pushes. And with poll model, there is probably no need for unsubscription, cause there is no subscription. Got an error instead of value? -> Decide to poll next or do something else.

Can we compare push vs pull? May be there is no difference on the finishing line. Was anything like app in C++Now 2017: Kirk Shoop "No raw std::thread! - Live Tweet Analysis in C++" - YouTube written with streams?

Here is another question. Which is easier to reason about? Zip in push or in pull paradigms?
In push, zipped things, like things in real life, have their own speeds, own time.
In pull, zipped stream is like a quantum mechanics miracle: you get anything only by looking, cough, polling it.

Looks like it became quite a passionate debate :grin:
Sorry for that (_ _)'

@mikalai
I don't expect Tokio to be my Rx, it's not what it have been designed for. Nor I expect futures crate to become my Rx either.

Here is a little quote from the RxJava book (written by one of the auhor of the RxJava library):

Hence this is where the tagline for Reactive Extensions (Rx) in general and RxJava
specifically comes from, ā€œa library for composing asynchronous and event-based proā€
grams.ā€ RxJava is a concrete implementation of reactive programming principles
influenced by functional and data-flow programming. There are different approaches
to being ā€œreactive,ā€ and RxJava is but one of them.

Rx, or maybe I should say reactive streams, is quite a "standard" that has implementations in many languages. But I would not been shocked if the Rust community choose to answer this use case another way.

That's maybe my bad the way I expressed my thoughts. Does Rust needs composition of asynchronous event based process ? I think so, if you want it to become a mainstream language in the already upcoming landscape of cloud, serverless, micro services....quote the buzzword you prefer ^^
And considering Rust performances (and especially memory consumption), I really think it can be a first class citizen in this area.

Is an Rx implementation the only way to do it ? That's what I'm trying to figure out. "No" is, to me, a valid answer to me if you have another proper (and easy if can be) way to achieve the expected result. And so "yes" can be if you don't have any alternative to do so.

3 Likes

In jargon terms: purity in FRP requires using referentially transparent functions so that all the denotational semantics hold true (and that there should be denotational semantics - even if it's not written out as a formal proof, the system needs to be sound and predictable so that there could be).

In words that are more my speed: if I have a whatchamakalit which is defined as the combination of a thingiemabobber and a doodad at every specific moment in time - I should be able to say with confidence the following:

If thingiemabobber is 10 from the beginning of time until time A, and then 20 at time B
And doodad is 1 from the beginning of time until time A, and then 2 at time B
And combination is defined as taking the ten's place from thingiemabobber and one's place from doodad

Then the value of whatchamakalit is 11 until A and 22 at B. In fact it's going to always, no matter how precisely we narrow down our time slices, be observed as one of those.

There should never be an observable reality where whatchamakalit is 12 because that just doesn't make any sense.

Right? Like in a timeline we should see:

T: ----10-----20----
D: -----1------2----
W: ----11-----22----

Wouldn't you consider either of the following broken ?

T: ----10-----20----
D: -----1------2----
W: -----12----22----
T: ----10-----20----
D: -----1------2----
W: ----11--12-22---

Yet RxJS had (has?) glitches exactly like that. I think there were workarounds or whatever - but the bottom line is that purity is important to prevent bugs.

Having lots of combinators might be cool - but that's icing on the cake... there's nothing preventing you from taking any pure (F)RP library and going crazy with higher level functions. A strong, predictable foundation is actually what allows building up those abstractions without worrying that things will break!

My point isn't to put it down - I'm sure the developers are skilled and there's a lot of value... but I find holding it up as an example to what Rust (F)RP library authors should aspire to, literally, backwards...

To my eye it actually seems to me like Rust is obsessed with purity and yet also practicality. You can't get accidental side effects of segmentation faults due to calling the same function with the same inputs, or getting a property of a null value (which wasn't null a minute ago), etc. etc.

The fact that it's also a "get 'er done" and high-performance language is pretty fascinating.

5 Likes

Please stop spreading misinformation about things you don't understand. If you don't understand something, you should ask questions rather than make statements.

As I explained earlier, it is a hybrid push + pull system. When a Future/Stream/Signal updates, it will call waker.wake_by_ref(), which will then cause the Task to pull the new value. So it only pulls when an update occurs. This makes it just as efficient as a push based system, and in many cases it's more efficient.

The Task will always automatically poll whenever there is an update, there is no manual polling.

There is a subscription, because of the Waker (which is used for pushing). Rust handles cancellation by using the standard Drop trait, which will then unsubscribe from the Waker.

4 Likes

@mikalai I really suggest that you should sit down and try writing a simple program with Streams. It will clarify a lot of things for you.

Don't be sorry, cause it is a good passionate debate. Some people are learning here. :blush:

Hold your horses.

This is talking business.

It seems that we only need to write docs to http://reactivex.io/ how standard Rx things are done in Rust.
This will help a lot folks like me :slight_smile:

1 Like

Even if we explain how to do it in Rust (considering the hypothesis we don't need an "real" Rx implementation), I don't think we could claim it to be a Rx implementation. This is a specification with Observable, Subscriber, Publisher, a set of Operators to implement and so on.

The goal and its implementation are not the same thing. Even if you can do the same thing with the future crate (and again, I summarize maybe way too much), you cannot name it "Rust Rx implementation".

Maybe it could be possible to enrich the futures crate documentation to explain what are the differences with a full Rx implementation. Not at an implementation detail level, but just clarifying some key stuff.
The futures crate's documentation doesn't seem to mention Rx nor FRP, but sometimes you need to define something by what it is AND what it is not so that people do not get confused.

@Pauan

I perfectly understand your opinion about the poll stuff, but the documentation is sometimes quite misleading, like @parasyte pointed out. From the Tokio documentation (mentioned in the futures crates as a place to watch for examples and extensive documentation):

As hinted at earlier, Rust futures are poll based. This means that instead of a Future being responsible for pushing the data somewhere once it is complete, it relies on being asked whether it is complete or not.

And this is a statement you can easy find in many place over the web (other crates, forums on rust-lang, and so on). Maybe it is a wish not to overwhelm newcomers with low level implementation details, but it's only natural that people take statements from reliable sources for what they are.

Anyway you pointed out the most efficient way to get things right (more or less), play with it ^^

1 Like

Man, oh, man! I am so with you on this!

So, can we make Rx in Rust?

  1. We already have very efficient stream mechanics. Invocations that happen on the first order level are zero cost, and potentially more efficient than any other listener registration based implementations.
  2. With streams we don't have the same contract on the second order level. But we effectively have subscriptions and unsubscriptions via drop (thank @Pauan for pointing it). Streams are very close to Rx.

What if we write Observables that internally use StreamExt's? When error comes, stream is dropped. Yep.
Whatever extra thin layer we add over streams, it should only be at the second order level, allowing actual computation on the first level to run zero cost.

Polling provides implicit backpressure.

  • If I don't poll for clicks for a second, do I get a fresh click from that external thing, or do I get a click from second before? In other words, with pressure, buffers may exist on producer's side. And we are not supposed to care what is on producer side.
  • Or, may be I want that producer to run fast as its executor can, for some desired side effects. And I expect to drop a few events on consumer side anyway.

For this, we may simply make an operator that is always polling strong, and has a buffer, making pile up of events explicit. We may have a strategy for dropping events. We may log drop of every N events, etc.

What do you think about such an experiment :nerd_face: ?

2 Likes

I agree, the documentation could use some improvements.

What makes it even more confusing is that there are two versions of Futures: 0.1 and 0.3. And they are very different from each other. So there is a lot of documentation talking about 0.1, even though it should be talking about 0.3 instead.

Quite frankly, my response was mostly because I really don't like mikalai's earlier attitude. He's made a lot of arrogant and unfounded assumptions and smears. It's taken a lot of my willpower to stay calm.

He seems to have calmed down now though, so it's okay.

Yes, it depends on whether the input is buffered or not. So some Streams might be buffered and others might not be buffered.

That already exists: buffered and buffered_unordered.

It's also easy to go the other way around: taking a buffered Stream and converting it into a non-buffered Stream.

I don't think you need to create a new Observable type, you can just add new methods to Stream, for example with an ObservableExt trait.

Over the last two days I've looked at these docs and even source a few times. Note how nowhere does it say why this buffer exists in the first place, that its raison d'etre is an important caveat about implicit backpressure.

I want to say that you extended a bit more effort here to enlighten the blind, and as a result we have an interesting option on the table. I wish someone else have done it to me year and a half ago.

By the way. Have been looking through other reactive attempts in rust. There are fresh reactive repos were people write callback based executors. Awesomeness of the non-callback implementation is totally hidden. ... Humans? :roll_eyes:

1 Like

Well considering my current skills about Rust and reactive mechanisms, I would more call it a (tough) challenge :sweat_smile:

@Pauan is quite right (again) to point out that version 0.3 is different, and I think worth waiting for it to be released. Not that I'm lazy but I really don't like doing things many times if I can avoid to :yum:

I may misunderstand you, but even if it is just semantic, I think it's important to get close to the existing vocabulary. If you claim to be an Rx implementation and change the terms, people will just run away. For people who already know an implementation from another language, the gap will be smaller if the semantic remains (because there will already be a gap for Rust newcomers).

One last thing we didn't not mention yet but that may be worth considering are 3 features of the Reactor implementation. Even if there is a slight difference between RxJava and Reactor. First is a reactive extension implementation rewritten on top of reactive streams. Reactor is a reactive stream implementation, thus does not implement all of Rx requirements.

  • the Mono and Flux types, maybe the most visible difference between the 2 libraries. Rx2 has Observable, Flowable, Single and Maybe. Reactor only expose these two (even if you can create them from a Publisher implementation, thus from a Rx one).
  • the ability to get a consistent context trough the whole reactive execution flow: Reactor 3 Reference Guide. I know the explanation sounds tied to Java way of handling threads, but the idea is worth considering I think.
  • the reactor addons and its StepVerifier utility who really eases unit tests: Reactor 3 Reference Guide

Or maybe it could be possible to have an Rx implementation with some of Reactor features if one think it's valuable.

It's time for me to sleep a little :sleeping:

Thanks for all these valuable answers, debates, ideas and so on. I really didn't expect this much, and I appreciate the time and effort you spend !

It may be worth expending the effort to write a crate that papers over details like vocabulary differences and lacking combinators. Then again, I could be speaking out of turn on this point, since I'm not an Rx practitioner, and personally I don't have any vested interest in using any language other than Rust these days. :slight_smile:

1 Like

Contract for what happens when error comes is the big deal here. Without same base contract many marble diagrams ( RxMarbles: Interactive diagrams of Rx Observables ) would be wrong.
Note ( ReactiveX - Catch operator ) how many languages can have different names, and even different nuanced versions for the same operator. But its an overall contract that allows you to go from language to language with ease.

By the way. Docs for stream operators should somehow fit marble diagrams. May be in ascii-art. But we, humans, seem to have a harder time to describe time sequencing in words, while grasp it immediately in a graphical form.

I wonder how buffer's diagram should look. Buffer in Rx is this ( RxMarbles: Interactive diagrams of Rx Observables ). Something different?

We do too. That's why we are here talking about Rx. Anything in Rx can be ported into other language that has Rx library. Would you port from node.js to python? No, no benefit. Would you port to Rust? Yes, yes and yes.

I did a few basic things with it rx in rust (GitHub - dtb11288/rxrust: Reactive Extensions in Rust) but not sure it's good enough and be needed in rust world

All I'm saying is ... if you want a crate that follows some spec, it just has to be written. I don't see any technical reason that the contracts can't be upheld in Rust.

And my followup comment was to say that I would not be contributing to it, so take it with a grain of salt.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.