Does Rust need Rx implementation (and/or more)?

Hi all, I'm a newcomer to the Rust language (easy to guess I think). First of all, congratulations for all the work achieved !

Coming from a Java background, there's a lot of activity about Reactive implementations (http://reactivex.io/), reactive projects and so on.
I also take a look at some initiatives like rsocket (http://rsocket.io/) or netifi and saw there was no Rust implementation.

Considering projects like Tokio and the way async has been implemented into Rust language (via the future crate), the poll vs push.pull model and so on...I'm a bit of confused and just wondering if Rust (and it's community) needs these kind of things or if they are just irrelevant due to the future implementation nature.

Tokio documentation states that back pressure is irrelevant for itself due to the poll model chosen for asynchronous programming. Reactive programming is often defined as asynchronous with back pressure
handling. Thus, reactive is useless in Rust context ? And same for rsocket, which is about reactive model at the network level ?

I read a lot of forums, blogs, articles and so on about stuff related but I couldn't get a clear picture of the whole thing.

If someone has a few minutes to enlight me a little about this, I'd appreciate ^^

Thanks in advance.

PS: french guy, sorry if you eyes bled because of faulty language.

4 Likes

Here's a couple projects to look at, and I'm sure there's more!

Dominator / Signals (both by @Pauan)

Sodium (a Rust port of the reference implementation discussed in the book Functional Reactive Programming)

I used the Typescript implementation of Sodium quite a bit, but not the Rust version yet (beyond a bit of dabbling), and don't know how the internals work... but from the discussions I've observed casually, there's definitely no inherent obstruction in Rust that stops push/pull/push-pull from being implemented. It's simply hard work to build a proper FRP (or even "reactive") library, probably harder and easier in Rust compared to other languages for lots of reasons, and along with the two projects mentioned above I'd be surprised if there aren't more to come :slight_smile:

4 Likes

Btw your English is fantastic! MUCH better than my French :wink:

1 Like

Thanks for the (very) quick answer !

I saw the Dominator crate a few days ago but it seems to be a DOM/js relative stuff (even though the signal management is relevant).

I didn't know about the sodium crate so thanks for the quote :grinning:

I was wondering how I would start a rsocket protocol implementation (if I have time to, let's be honest). Should I build it upon a low level framework like Tokio, or mio or do I miss a step with a Rx compliant framework ?

I read some reactive related discussions and topics started 2 years ago but they just stopped and all relative crates seem to be in an more or less abandoned state.

It's is more a matter of curiosity at the moment. I try to get the picture right before starting to be involved in an endless/useless stuff :sweat_smile:

PS: french people MUST improve their English to be able to understand the literature and documentations related to computer science, languages and so on. English guys just don't need to :rofl:

1 Like

At a very rough glance, it seems to me like Signals is a bit closer to the idea of "Reactive Programming" in a way that feels (and performs) like idiomatic Rust - whereas Sodium aims to be an accurate implementation of Conal Elliott's FRP, at the expense of ergonomics (but also capitalizing on lots of Rust's performance benefits).

So maybe that'll help give you a leg up if you start digging through the source code to see what's going on. I can't really help on that level yet, I'm still a relative newbie here and get stuck on simple things every time I open the IDE :wink:

Good luck!

Most of the FRP implementations I've seen are push/pull based. The subscriber receives event from the publisher, and thus backpressure is required to ensure it won't be overwhelmed with more events it can handle. But the Rust future crate is poll based and that's what troubles me a bit.

I played a bit with Actix web for example, that relies on Tokio. I got a fully asynhronous app, no back pressure issues and thus started to wonder if reactive stuff was even useful.

I think I need to investigate a bit more deeper about FRP, signal management and how asynchronicity is handled and used in those implementations. I may just be wrong about thinking I should start using Tokio or any existing future based crate for that.

Thanks again for your replies !

1 Like

Dominator is indeed a DOM framework (like React, Elm, Angular, etc.)

However, futures-signals is not tied to the DOM. It is a general purpose FRP library which can be used in any Rust program, for any compilation target.

It's designed to be zero-cost, so its performance is very good, much better than the other Rust FRP libraries I've looked at. It's stable and generally feature-complete, though I wouldn't mind adding a few more combinators.

You shouldn't be concerned: Rust Futures are using a hybrid push + pull system, where the producer sends notifications and the consumers then poll in order to receive the new value.

Rust Streams (which are built on top of Rust Futures) fully support backpressure and automatic cancellation.

I think the right way to think about it is: a Stream is an asynchronous Vec, and a Signal is an asynchronous RwLock. Both are useful, but they're useful in different ways.

So I think programs should mix-and-match them as needed, depending on the needs of the program. There isn't a one-size-fits-all data type!

So if Futures / Streams suit your program, then that's what you should use.

6 Likes

Another implementation you might be interested in is flo_binding, which I wrote to support FlowBetween. I grew it around the needs of the larger application so it has a 'minimalistic with broad applications' design, with just four main functions.

Of relevance to your original question, bind_stream() turns a stream of events in to a state: say input from tokio, and follow() goes the other way, producing a stream from state - which responds to back pressure by dropping outdated items, so if your state is being updated 100 times a second but your UI is only able to read those updates for display 20 times a second, it'll only get updates 20 times a second (and thus not lag behind).

I think computed() is quite interesting as I miss it in every framework that doesn't have an equivalent. It automatically subscribes - and unsubscribes - to any binding used in its function, which saves a lot of plumbing and makes for quite straightforward code in complicated cases where a lot of state is involved or where some state is only used conditionally.

1 Like

FWIW, futures being "poll-based" is kind of a misnomer. They are both poll-based and event-based. Have a look at the documentation for the poll method. Specifically this quote:

The poll function is not called repeatedly in a tight loop -- instead, it should only be called when the future indicates that it is ready to make progress (by calling wake() ).

In other words, poll is called only in response to being woken by some event or signal.

5 Likes

Thanks all for your answers and explanations. I'm aware now I misunderstood a part of the poll-based theory so back to the documentation ^^

I know that the asynchronous story in Rust is quite fresh and that many people are involved into it.
That's why I first thought: "let's figure out things in the right order":

  • having finished proper async construct and tools in the language (I'm not saying things are not done properly, just that it's under active development)
  • having a valid Rx implementation (by valid I mean kind of official and ReactiveX compliant for example)
  • then you can start to think about other stuff (rsocket or whatever)

But before trying to get involved into something I may not have time (or capacity, let be honest) to do properly, I just wonder do I (or the Rust community) need it ? A great job with Tokio or Actix (and others) have been done and you wen can already built some really nice stuff, with from low to very low resource consumption. The future crate has quite useful operators and you can already compose things quite easily (more or less sometimes).

And I was wondering if an official Rx implementation has not raised, is it because of the young age of Rust, because the community has other priorities (quite a valid argument considering the 2019 roadmap), because it's not that useful nor required, or just because you can already achieve the same result but in another way ?

But I'm only at the beginning of my journey with Rust so I'll start with a look at what you advised me.

3 Likes

Just to clarify something: for years Futures existed as a crate, it wasn't official. And even now, only the bare minimum is put into Rust, most of Futures still lives in a third-party crate.

Rust generally takes the philosophy of not making things official, instead it prefers to have a large ecosystem of third-party crates.

So if you're hoping for an "official" implementation, you'll be waiting a very long time. But there's nothing stopping people from making crates, which is what I and other people have done.

Also, a lot of the things that Rx was designed to solve are instead solved by Streams (which are provided by the futures crate). That's why web servers like Tokio don't use (or need) Rx, because they use Streams instead.

That's why I said that it's best to think of a Signal as an asynchronous RwLock and a Stream as an asynchronous Vec: Rx combines those two concepts into one data type, but with Rust we split it into two separate data types.

5 Likes

Sorry I made myself unclear. By official, I was meaning a widely adopted and/or referenced one. More or less like Tokio is for asynchronous applications. I like the pick (or do) what you need philosophy, even if sometimes, "standards" make it easier and faster to begin with.

In fact, you may have answered the initial question here ^^ Does Rust needs an Rx implementation ? Maybe not, because the futures crate do the job, but in another way (although operators may sometimes look similar).

And maybe same can be said about a rsocket implementation for example. Once the protocol is written, client and server may be written with just a thin layer over Tokio using futures crates. I summarize maybe a way too much here but I think you'll get the idea.

1 Like

@jflorte let me give my opinion on Rx and Rust.

Facts:
Tokio will never be your Rx. The reason is how exceptions are handled (this detail is important).
From discussion on github, guys won't turn tokio and/or parts of it closer to Rx, cause they are legitimately focused on tokio's narrow goals. They are narrower than Rx's.

Opinion as in rant:
I personally run when someone comes with FPR when talking about Rx. This is an architect astronaut speak. The problem is with word "pure" -- humans never write perfect code.
Some FRP framework will not provide practicality of Rx operators, that you can use in different languages ( ReactiveX - Languages ).

Opinion:
Rx is a tool for glueing and manipulating processes. It is the whole monad topic: Rx operators are second order objects that can be combined together. Glueing and shaping of execution flow is what makes us love Rx in practice.
If you have a challenge of formulating some complex process in code, use Rx. Threads are too primitive ( C++Now 2017: Kirk Shoop "No raw std::thread! - Live Tweet Analysis in C++" - YouTube ). Channels are too primitive. Signals and other low level concepts are too primitive for a practical everyday use.

Rust:
Have you looked at the length of Operator class in RxJava. There are more lines in a class than github can display at once! It is not Rx's fault. It is Java's.
Rust is perfectly suited for writing Rx thanks to traits.

Moving on:
At StarCon this January ( https://starcon.io/ ) some of us doodled a nice way of using rich typing for Rx API. But it is a slow moving process. If people vote here for Rx in Rust, we'll do a workshop at Rust-KW meetup, dedicated to hacking this Rx into working v0.1.

3 Likes

Could you explain more about that?

Purity has nothing to do with perfection.

Purity means managing state in a declarative way. The whole point of FRP systems (yes, including Rx) is to manage state in a more declarative manner.

Yes, that's great, and that's also provided by Iterators, Streams, Signals, Futures, etc.

The problem with Rx is that it tries to combine everything into a single "observable" concept. That causes performance issues, state issues, and also adds a lot of unnecessary complexity (such as "hot" vs "cold" observables).

Thankfully, it's possible to fix those issues (which is what other FRP systems have done).

Why do you claim that Signals are low level? Signals are equivalent to Rx's Observable.

Within the FRP world, the words "behavior", "observable", "signal", etc. all mean the same thing.

1 Like

@Pauan replying inline:

In Rx operators are points/sections of process chains, or pipes (whatever analogy is better looking).
Exception is something that breaks section of a process. It must also break a bigger pipe, of which this section is a part. In Rx terms it means an on-error unsubscription. Tokio is not doing this.

Bubbling breakage has something to do with the composition. And since Rx is all about composition, making exception handling a big deal.

Notice also that tokio has precisely one execution mode. But in a tool that glues processes we want to see few execution modes: (a) parallel execution, but fifo triggering of downstream section, (b) parallel execution with racing out to downstream, ( c) single thread execution, when following item processing waits for the leading to complete its processing, (d) other modes?

  1. No. Rx looks like a declarative and explicit description of a process. Consider this x.debounce(...).merge(y).map(...).filter(...).last();
    These words talk about specific sections of a process.

  2. Purity in FRP requires perfection. Read up Cycle.js folk complaining a lot about how other programmers (humans) don't write pure code. Humans are not perfect machines to stick to the only right way.
    Let's hear it for necessity of having different ways to program (anathema in purity headquarter): https://www.youtube.com/watch?v=sTSQlYX5DU0

  1. Let's note that Future is one example of Observable.

  2. If somebody teaches Rx by saying "turn every variable to observable", then its a problem with teaching.
    When I first read Rx's promo on the site, it was "sequence of values in time". Totally non-exciting message.
    Only after using it, you realize that Rx is about gluing processes together.
    By the way, current moto on the site: "observer pattern done right" is also sort of empty.

  3. When you say that operators are execution sections, everything becomes easier. Do I want to precisely manipulate a starting point of a process? Yes. Then I do want to have hot/cold nobs to be available to me.

  4. I agree that word observable is not intuitive. Event-source is probably more precise and honest. Cause, when you implement it, its about events: when processing starts, when it ends, how results are ordered, combined. When you use it, you may have some void's coming, cause no data need to pass, but a presence of events is used.
    But naming is a historical artefact, and we may be stuck with this.

Can I write an overall process that glues other processes of my program with Signals in a couple of line, like in x.debounce(...).merge(y).map(...).filter(...).subscribe();? You know, high level in terms of business process flow.
I honestly don't care if Signals check all boxes on FRP list. I am looking for simple way to write complex and intricate processes on a stupendously parallel hardware with all Rust guarantees against making bad errors in a parallel world.

@Pauan Iterators vs Observables

Yes, mathematically, on an abstract level both are collections of values. One collection is in space, another is in time. There is a symmetry. Horay!

But when it comes to programing, we add human into the mix. We, humans, we are much better with spacial things than with timing. We are asymmetric in our cognitive abilities!

That is why filter and forEach on an array are cute, but its just another way of writing a code. No excitement here.

But use of observables may allow you to write some process logic that you simply couldn't before. And once you have such experience, you are sold on observables.

1 Like

@jflorte

There was one quite early attempt. Person tried to mimic Rx from C#. Questions with ownership in types were raised.

Lesson from this attempt is to try implementation with Box and Arc pointers for passing "observed" values, and later see if some execution modes can pass T's.

Echoing another thread, useful but big things like Rx should not be pulled into std.

I'm sorry, but I didn't understand any of that.

What I do know is that with Streams, you can choose to either propagate errors or not (as desired), and manipulate errors in various ways, all using the standard Rust Result tools.

Propagating errors is as easy as using the TryStreamExt methods.

That's not true, you can absolutely have other "execution modes" (including various types of parallelism).

You can also have different Executors/Reactors, such as a multi-thread pool, or a blocking single-thread executor, etc.

The Future/Stream/Signal system was specifically designed to allow for different execution strategies.

Streams can accomplish that with the select combinator:

// Merges foo and bar together, pulling in FIFO order
select(foo, bar)

I'm not sure how that differs from (a), but I'll just point out that Streams supports other parallel combinators, such as for_each_concurrent which can be used to loop over all the values in a Stream, but in a parallel way.

And of course it's possible to create your own combinators (parallel or serial).

A process is simply a declarative description of how state changes over time. That's the basis of FRP: extending declarative programming so it can reason about state over time.

A process that doesn't have state is completely useless, because the entire point of programming is to transform state.

So... because one person wants 100% of things to be pure, that means that all FRP programmers want that?

And therefore "purity" is now a "bad word" which should be avoided, rather than being a useful description of how programs operate?

I'm not sure why you're bringing that up... you seem to think I'm some sort of crazy purist zealot, but I'm not. I have no intention of making everything 100% pure.

You're the one who falsely equated FRP with "100% purity", I never did that.

And everything is an example of a Lambda, because of the Lambda calculus. So let's just get rid of all distinctions and program using exclusively Lambdas!

I disagree with that, there are real differences between things. Many years ago I tried to combine everything into a an uber type, but it didn't end well. I've learned since then. Carefully splitting things into multiple types makes programs better in many many ways.

I have created and used practical FRP systems in real programs for several years.

I am well aware of the benefits of FRP. I just disagree with Rx specifically, because of the issues that it has (which have been solved by other FRP systems).

I'm not an academic, the only reason I use FRP is to get shit done.

Yes. You can do that with Futures, Streams, and Signals. That's exactly why those types were created.

Great, me too. Except I already created Signals, and they're very fast, and robust, and easy to use, and I'm using them in real programs. And Streams also already exist, and are great. So my mission has been accomplished.

That's very inefficient. And there are much better implementation strategies (such as Streams and Signals) which work perfectly with Rust's memory model.

1 Like

I am trying to find lift operator in Signals repo. Not there.

How can I write for example merge operator? Please, show the code here.
Rx is practical, cause a gazilion of operators exist: ReactiveX - Operators

Debounce with future? Nay.
I also checked my comments. I haven't said that Streams is too primitive. In fact, you just bringing it up. Thank you. I kinda like the code.

Cool. Let's talk concrete example.

Let's have operations opX.
op1().op2().

Let's now have op1 in one parallel executor, and op2 in another also parallel executor (combining different execution modes).
How can we pass big object from op1 to op2 without copying too much?
And if it is something other than bare T, how does all extra typing disappears to make it only, for example Stream<T, Err>?
And what about lifetime story?

@Pauan I appreciate links to implementation code. Cause if it exists, we don't want to invent wheels here. Thanks.

Talking about Rust memory model.
If any T on a stack is passed from op1 to op2, it tells that both operations are going to run on the same thread. Interesting constraint in the name of efficiency, isn't it? Thread can't pickup new event in op1, leaving completed in 1 event for other thread to run op2.
Am I wrong here?

If `op1(f1).op2(f2) pass huge T not liftable from a stack, then the only mode is effectively f2(f1) pipeline jobs scheduled to run on possibly many threads. Yep, old Netty pipelines.
And I had a feeling that tokio is altimutely this. But there can be more options here.