Desync 0.2.0: low-effort asynchronous programming

Hi, I'd like to present desync, one of my Rust projects. I released version 0.2.0 last night, which adds some useful features for handling streams. I've been using this library quite a lot in my own projects and I think it provides a useful and at least semi-novel approach to writing asynchronous code.

desync provides a single new type, Desync<T>, which turns a synchronous type into an asynchronous one, via two API calls. sync performs an operation and waits for it to complete, and async starts an operation in the background. Operations are always performed in the order that they are queued.

This API might be quite small, but it can replace a lot of other asynchronous constructs: sync alone is an alternative to mutexes, and with async it's possible to avoid manually managing threads and dealing with scheduling and ordering issues.

v0.2.0 adds some new features to support the futures library. The pipe and pipe_in functions make it possible to schedule operations on a Desync object when data becomes available on a Stream. This is analogous to UNIX pipes, and provides a way to asynchronously send data to an object or implement a message-passing system.

Docs are here: https://docs.rs/desync/
Repo here: https://github.com/logicalshift/desync/
Blog post with some more info here: https://logicalshift.io/articles/rust-tools/desync.html

2 Likes

Is there a way to check if the Desync item has completed a task? I see there is an after() method, but what if I wanted to check if it was done yet, and execute different taks depending on whether it has finished yet? Like a status or something? Maybe I'm missing something?

Ah, I think the method you probably want is after()'s opposite - future(). This is just the same as async() except that it returns a future that can be polled to check the status of that task. It can be called with an empty closure to create a future that will complete when all of the currently scheduled tasks are completed, or it can be used as an alternative to sync or async.

It might be worth noting that because of how the tasks are ordered, the future will complete before any tasks scheduled after it. There isn't currently a generic way to get a future that will complete when a Desync is idle, but one way around that would be to schedule everything using future() and only keep track of the status of the most recent one - Desync performs every task in the same order that they're scheduled, so the last task scheduled will also be the last one performed.

1 Like

So, it behaves kind of like an actor, only the encapsulation boundaries are different, i.e. it is just data and if you have access to it you can perform any operation on that data, not just the operations defined for it (if it was an actor).

@ahunter: If I had 1000 of these running async operations, would they schedule nicely? (e.g. is the work distributed across 1 thread per core, or 1000 threads, or how does it work?)

It looks like a neat abstraction.

Yes, the comparison to actors is apt. I didn't actually notice it myself until I'd written quite a lot of the crate: I initially set out to solve the scheduling problem, with the data encapsulation idea coming later after I tried to use the scheduler with a real problem.

The scheduler should work well with as many of these as you want. It works by creating a pool of threads. Each Desync object has a queue of jobs and if there's a free thread and a queue receives a new asynchronous job, that queue will be assigned to that thread (where it will execute until it has finished). If there's no free thread, then it waits until one becomes free.

Synchronous jobs work a bit differently: if they're added a queue that's running then they will block the current thread until they're completed. However, if the job queue is waiting - either due to being empty or because the thread pool is fully utilised - they will use the current thread to run jobs until the synchronous job is completed and then go back to waiting.

This means that only a few threads are needed to run jobs: within the thread pool, sync requests transfer control amongst the various queues, and outside of the pool they effectively add a new thread to prioritise running tasks to get a high-priority result.

It also works with a pool size of 0. In that case, asynchronous tasks are never executed until a synchronous task causes them to get prioritised (this is interesting to me because it's a way to make code written with this library work with WebAssembly).

The default number of threads for running background jobs is 2 per CPU (as reported by num_cpus). It's possible to customise this if needed by importing the desync::scheduler module and calling scheduler().set_max_threads(x) - more threads might be needed if async operations are doing a lot of blocking, for instance.

The 'future' call provides another possible reason to increase the number of threads right now: it looks a lot like sync but doesn't have the prioritisation effect so might not have good characteristics when load is very high. (I do plan on fixing this in the future, because I'd like it to work in the 0 thread scenario too)

1 Like