Designing an async flow of data

I’m interested in implementing a simple signal processing framework in Rust (similar to e.g. GNU Radio for those who are familiar). The basic building blocks can be one of three:

  • A source block that generates input data (e.g. a continuous flow of random bytes from /dev/random)
  • A processing block that takes data and processes it (e.g. a moving average block with N=10 that averages a sliding window of 10 bytes and outputs the average values)
  • A sink block that takes data and throws it somewhere (e.g. a file sink that saves the bytes to a file)

A pseudocode implementation for the example blocks would look something like this:

async fn random_source() {
    for byte in read("/dev/urandom") {
        yield byte
    }
}

async fn moving_average(N, input) {
    for byte in input {
        buf = buf.slice(0, N-1);
        buf.push(input);
        yield average(buf)
    }
}

async fn file_sink(input, outfile) {
    for byte in input {
        outfile.write(byte)
    }
}

flow = chain(
    random_source(),
    moving_average(),
    file_sink()
)

runtime.run(flow)

Given the advances Rust is making in async and futures, what would be a solid design for such a framework? Is it possible to have a clean pattern for each “block” to follow that allows it to be implemented as a single function with a known signature? How would these blocks be later fed into a runtime?

Any input would be much appreciated.

2 Likes

I think the ideal abstraction here would not be Futures, but rather Streams and Sinks, which basically model asynchronous iteration as opposed to asynchronous execution.

These will not be part of the initial stable version of async fn, because it is currently thought that the best way to model them is via asynchronous generators, and the current prototype of generators in nightly needs more work before it can be stabilized, let alone built upon by the async ecosystem.

Without those, you can probably get away with one future per input data packet, but you must be very careful about when you schedule those futures, or how you synchronize them with each other, as otherwise it is well possible to end up outputting packet 2 before packet 1 since future execution is unordered…

1 Like

Thanks @HadrienG for your quick response and input! I agree very much with your analysis that indeed the async iteration is the right abstraction. Additionally, the given flow requires some level of synchronization not only to avoid unordered execution, but also to ensure proper saturation of the flow and prevent buffer under/overruns.

However at this point I would be happy to get started with even a naive implementation, and gradually expand it as new rust features become available.

Is there any feature set that can be used from nightly which can built upon to provide a working POC?

1 Like

You may want to have a look at the original macro-based prototype for async-await, which had some support for streams. It was also published on crates.io.

Since it’s a macro, and not first-class language syntax, it may have worse ergonomics that the final async stream syntax in Rust will hopefully have, such as weird compiler diagnostics and less friendly handling of borrows. But if it has not fallen too far out of sync with current nightlies, it could be good enough for API design experiments.

There may also be more recent / ongoing design experiments around async streams that I’m not aware of, you may want to ask around on internals.rust-lang.org or Discord.