Semi-async function

I have a trait (let's call it BinaryRepr) that allows serializing a data structure (&Self) into a stream of bytes by writing it to an asynchronous writer (tokio::io::AsyncWrite). I want this to work asynchronous because the data structures might be big and the receiver might be a remote network peer (and I don't want to buffer the whole binary representation in memory). Let's call the method that performs the serialization "dump".

I also have the need to calculate hashes based on this binary representation. Let's call the method that performs the hashing "hash".

As I dislike redundancy here, I don't want to implement code twice. But I would also like to be able to calculate a hash value from non-async functions. That should not be a problem as calculating the hash will not require any I/O and is fast in my use case.

However, I cannot define a function (dump) that sometimes is async and sometimes isn't.

I solved this by convention:

use async_trait::async_trait;
use futures::future::FutureExt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncWrite, AsyncWriteExt};
use tokio::task::unconstrained;

#[async_trait]
trait BinaryRepr {
    // CONVENTION:
    // `dump` must only yield due to calling methods on `writer`
    async fn dump<W: AsyncWrite + Unpin + Send>(
        &self,
        writer: &mut W,
    ) -> io::Result<()>;
    fn hash(&self) -> u8 {
        let mut hasher = Hasher::new();
        unconstrained(self.dump(&mut hasher))
            .now_or_never()
            .unwrap()
            .unwrap();
        hasher.finalize()
    }
}

Notice the now_or_never method (originating from futures::future::FutureExt) followed by two unwrap()s, which panics at run-time unless dump immediately returns with a success (which it, by convention, always will when using Hasher as writer).

I used tokio::task::unconstrained to avoid unnecessary yields (e.g. if dump internally uses Tokio message queues or similar). (See also Async write into Vec, but note that by convention, this might be the responsibility of the implementation of dump anyway, so could be left out here.)

I can then implement Hasher, for example, as follows:

struct Hasher {
    state: u8,
}

impl Hasher {
    fn new() -> Self {
        Hasher { state: 0u8 }
    }
    fn digest<T: AsRef<[u8]>>(&mut self, bytes: T) {
        for byte in bytes.as_ref() {
            self.state = self.state.wrapping_add(*byte);
        }
    }
    fn finalize(self) -> u8 {
        self.state
    }
}

Now I must make Hasher implementing AsyncWrite in such a way that writes won't ever be pending:

impl AsyncWrite for Hasher {
    fn poll_write(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::into_inner(self).digest(buf);
        Poll::Ready(Ok(buf.len()))
    }
    fn poll_flush(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

I can then use everything as follows:

struct Packet {
    content: String,
}

#[async_trait]
impl BinaryRepr for Packet {
    async fn dump<W>(
        &self,
        writer: &mut W,
    ) -> io::Result<()>
    where
        W: AsyncWrite + Unpin + Send,
    {
        writer
            .write(
                format!(
                    "Packet len={}\n",
                    self.content.len()
                )
                .as_ref(),
            )
            .await?;
        writer.write(self.content.as_ref()).await?;
        Ok(())
    }
}

// NOTE: `main` isn't async here!
fn main() {
    println!("Hash: {}", (Packet { content: "Hello".to_string() }).hash());
}

I would consider the dump method being semi-asynchronous, because it will only yield if method calls to its argument (writer) yield. Unfortunately, this isn't reflected in any way by its type – any implementation that violates this convention will lead to a run-time panic.

While I dislike the panics, I wasn't able to come up with any better solution for my problem. I would appreciate feedback on my idea. Is it a reasonable way to solve this? Am I thinking too complicated?

1 Like

It's a bit unwieldy, but here's one approach:

use async_trait::async_trait;
use std::marker::Unpin;
use tokio::io::{self, AsyncWrite, AsyncWriteExt};

#[async_trait]
trait BinaryRepr {
    async fn dump<W>(&self, writer: &mut W) -> io::Result<()>
    where
        W: AsyncWrite + Unpin + Send;

    fn hash(&self) -> u8;
}

#[async_trait]
impl<T> BinaryRepr for T
where
    T: ?Sized + Sync,
    T: for<'a> BinaryReprHelper<'a>,
{
    async fn dump<W>(&self, writer: &mut W) -> io::Result<()>
    where
        W: AsyncWrite + Unpin + Send,
    {
        let mut dump = self.start_dump();
        while let Some(slice) = dump.next_slice() {
            writer.write_all(slice).await?;
        }
        Ok(())
    }

    fn hash(&self) -> u8 {
        let mut state = 0u8;
        let mut writer = self.start_dump();
        while let Some(slice) = writer.next_slice() {
            for byte in slice {
                state = state.wrapping_add(*byte);
            }
        }
        state
    }
}

trait BinaryReprHelper<'a> {
    type ReprWriter: BinaryReprWriter + Send + 'a;

    fn start_dump(&'a self) -> Self::ReprWriter;
}

trait BinaryReprWriter {
    fn next_slice(&mut self) -> Option<&[u8]>;
}

Example usage:

struct Packet {
    content: String,
}

enum State {
    WriteFmtStr,
    WritePacket,
    Done,
}

struct PacketWriter<'a> {
    packet: &'a Packet,
    fmt_str: String,
    state: State,
}

impl<'a> BinaryReprHelper<'a> for Packet {
    type ReprWriter = PacketWriter<'a>;

    fn start_dump(&'a self) -> PacketWriter<'a> {
        PacketWriter {
            packet: self,
            fmt_str: String::new(),
            state: State::WriteFmtStr,
        }
    }
}

impl<'a> BinaryReprWriter for PacketWriter<'a> {
    fn next_slice(&mut self) -> Option<&[u8]> {
        match &self.state {
            State::WriteFmtStr => {
                self.state = State::WritePacket;
                self.fmt_str = format!("Packet len={}\n", self.packet.content.len());
                Some(self.fmt_str.as_bytes())
            }
            State::WritePacket => {
                self.state = State::Done;
                Some(self.packet.content.as_bytes())
            }
            State::Done => None,
        }
    }
}
1 Like

Thank you very much for taking the time to "wield" such an example. It feels too verbose yet for me to actually use it in my code. But I might end up using it or something similar.

I was able to get rid of the extra helper trait by using generic associated types (GATs):

#![feature(generic_associated_types)]

use async_trait::async_trait;
use std::marker::Unpin;
use tokio::io::{self, AsyncWrite, AsyncWriteExt};

trait BinaryReprWriter {
    fn next_slice(&mut self) -> Option<&[u8]>;
}

#[async_trait]
trait BinaryRepr {
    type ReprWriter<'a>: BinaryReprWriter + Send + 'a;
    fn start_dump(&self) -> Self::ReprWriter<'_>;
    async fn dump<W>(&self, writer: &mut W) -> io::Result<()>
    where
        W: AsyncWrite + Unpin + Send,
    {
        let mut dump = self.start_dump();
        while let Some(slice) = dump.next_slice() {
            writer.write_all(slice).await?;
        }
        Ok(())
    }
    fn hash(&self) -> u8 {
        let mut state = 0u8;
        let mut writer = self.start_dump();
        while let Some(slice) = writer.next_slice() {
            for byte in slice {
                state = state.wrapping_add(*byte);
            }
        }
        state
    }
}

struct Packet {
    content: String,
}

enum State {
    WriteFmtStr,
    WritePacket,
    Done,
}

struct PacketWriter<'a> {
    packet: &'a Packet,
    fmt_str: String,
    state: State,
}

impl BinaryRepr for Packet {
    type ReprWriter<'a> = PacketWriter<'a>;
    fn start_dump(&self) -> Self::ReprWriter<'_> {
        PacketWriter {
            packet: self,
            fmt_str: String::new(),
            state: State::WriteFmtStr,
        }
    }
}

impl<'a> BinaryReprWriter for PacketWriter<'a> {
    fn next_slice(&mut self) -> Option<&[u8]> {
        match &self.state {
            State::WriteFmtStr => {
                self.state = State::WritePacket;
                self.fmt_str = format!("Packet len={}\n", self.packet.content.len());
                Some(self.fmt_str.as_bytes())
            }
            State::WritePacket => {
                self.state = State::Done;
                Some(self.packet.content.as_bytes())
            }
            State::Done => None,
        }
    }
}

fn main() {
    println!("Hash: {}", (Packet { content: "Hello".to_string() }).hash());
}

Now thinking about it, in most cases the ReprWriter might be trivial, except in those cases where larger structures are written (i.e. in the given examle, a single slice would suffice).

Either way: Your code helps me to understand the nature of my problem better. It looks like to implement this within Rust's type system (and thus avoiding tricks like conventions, now_or_never, and unwrap), I have to refrain from using the async keyword for my "semi-async" function. Instead, I must store the State manually – which pretty much makes asynchronous programming less fun.

Perhaps a better solution for this dilemma would be a language extension where async can have a context (at compile-time), e.g. async<TokioRealm> vs async<BinaryRealm>. Using await could then be limited to async functions with the same context (type). Does that make sense?

Thinking this through, the best terminology for my dump function perhaps isn't "semi-asynchronous", but "asynchronous with context". What if we would turn std::task::Context into being a trait rather than a struct, and letting async specify the concrete type in angle brackets? Has anyone ever thought of this? (I always wondered if there are any plans to extend Context at a later stage, as it currently only holds the Waker.)

Yeah, I did indeed introduce the helper trait due to the lack of GAT.

As for the async context stuff you're talking about, I think what would actually be needed here to make it pleasant is generators.

1 Like

Okay, that helps me to understand better the advantages and use-cases of GATs. I think Serde also keeps these extra-lifetimes around because it was developed without GATs (but I never really used Serde yet, just had a short look and found it pretty complex).

I do believe that my proposal is a superset of generators. BinaryRealm would actually be a task::Context implementation that doesn't even have a waker. I think that's the equivalent to generators.

I don't think serde can change the Deserialize trait even with GAT because the type that varies over the lifetime is Self, not an associated type.

Okay, thanks for clarifying. I have no experience with Serde really.

Thinking about this further, I think my proposal isn't a superset of generators. It's not just a missing waker, but ReprWriter::next_slice in your example also should never be able to return something like Poll::Pending, so ReprWriter doesn't really have the semantics of a Future.

However, I do see a similarity between asynchronous functions and generators, and I wonder if both could be covered by a single (more generalized) language construct. Considering that Rust offers async and await, there is a temptation to implement my version of dump as an async function, as otherwise syntax gets ugly. But using async introduces unnecessary runtime overhead (and runtime panics on error).

P.S.: I think when I tried to learn Rust for the first time (many years ago), it had some sort of coroutines (or was it generators?) which were later dropped from the language. I remember I was pretty mad about it (at that time :grin:).

Yeah, not being able to return Pending is why you want generators (which is another word for coroutines).

As for a single more generalized language construct that encompasses both async/await and generators, well that is indeed possible. For example, in Haskell that language construct is called a monad. The question mark operator is a third example of a monad.

1 Like

This made me wonder if ReprWriter actually is an Iterator. I tried to write that down, but run into lifetime problems. I think that's because Iterator::next has the following signature:

fn next(&mut self) -> Option<Self::Item>

While I need something like:

fn next(&mut self) -> Option<Self::Item<'_>>
// the same as:
fn next<'a>(&'a mut self) -> Option<Self::Item<'a>>

So I think that's the reason why to use an own trait for it. Maybe there is something existing to use for that sort of scenario? I also found this older thread in the internals forum, which is explaining exactly that problem. So I guess what you implemented here is some sort of a "streaming iterator"?

Yes, my trait implements a streaming Iterator specialized for byte arrays.

Interestingly, the bytes crate already has more or less that trait with its Buf trait.

1 Like

Thank you very much for all your help and explanations. I learned a lot.

I think what I did was to "simulate" a streaming generator using async, disallowing Pending by convention, and using a dummy executor (provided by the now_or_never method from futures::future::FutureExt) that will panic if the future (i.e. the "task") returns pending when polled.

Not sure which way I'll go:

  • simulating the streaming generator using a dummy executor,
  • implement my own "streaming iterator specialized for byte arrays",
  • or use a crate such as bytes.

The first variant is my original proposal with a tiny bit of unnecessary runtime overhead, while the second and third variant seem to be "semantically" more correct (but with a lot of extra typing work when implementing the streaming iterator without having "yield").

I feel like my original idea isn't sooo bad after all. In either case, I understand the background much better now.

I have been thinking about this.

Let's put it together more concisely.

First, we define a macro to iterate over Streams (under the assumption that this stream's poll_next method never returns Poll::Pending):

pub use futures;
macro_rules! generator_for {
    ( $a:ident in $b:expr, $c:block ) => {
        {
            let mut stream_ = $b;
            while let Some($a) =
                ::std::option::Option::unwrap(
                    $crate::futures::future::FutureExt::now_or_never(
                        $crate::futures::stream::StreamExt::next(&mut stream_)
                    )
                )
                $c
        }
    }
}

Now we can do:

use async_stream::stream;
use futures::pin_mut;
use futures::stream::Stream;

fn generator() -> impl Stream<Item = i32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn main() {
    let g = generator();
    pin_mut!(g);
    generator_for!(value in g, {
        println!("Got: {}", value);
    })
}

Note that main is not async, and note that I did not use any unstable features here. :grin:

It prints:

Got: 0
Got: 1
Got: 2

Of course, the above example could also be achieved with passing a closure. But we can also define:

use std::pin::Pin;
use futures::future::FutureExt;
use futures::stream::StreamExt;
fn generator_next<I, S>(mut g: Pin<&mut S>) -> Option<I>
where
    S: Stream<Item = I>,
{
    g.next().now_or_never().unwrap()
}

And then this works:

fn main() {
    let g1 = generator();
    let g2 = generator();
    pin_mut!(g1);
    pin_mut!(g2);
    assert_eq!(generator_next(g1.as_mut()), Some(0));
    assert_eq!(generator_next(g2.as_mut()), Some(0));
    assert_eq!(generator_next(g1.as_mut()), Some(1));
    assert_eq!(generator_next(g1.as_mut()), Some(2));
    assert_eq!(generator_next(g1.as_mut()), None);
}

The syntax is a bit clunky yet and it's not type-safe, as we could pass a real async stream to generator_for! or generator_next (that can be Pending if polled), but possibly that could be fixed by providing a new macro (let's call it "generator!" (which would replace stream!)) which wraps our stream into a newtype that indicates it's safe to call .next().now_or_never().unwrap().

You might like the genawaiter crate.

1 Like

I played around with various options and took a short look at the genawaiter crate too.

The thing I usually don't like about generators is that they cannot yield through multiple functions (same as you cannot await an async function through a non-async function call). Consider this Lua code, for example:

#!/usr/bin/env lua

function count(from, to, step)
  for i = from, to, step do
    coroutine.yield(i)
  end
end

function multicount()
  count(1, 3, 1)
  count(10, 30, 10)
end

for i in coroutine.wrap(multicount) do
  print(i)
end

This prints:

1
2
3
10
20
30

But most other languages usually do not support this, because the yield is in a different function (count instead of multicount).

That said, I would like my implementations of BinaryRepr::dump to be able to call other functions which do the actually dumping/writing.

I experimented with multiple ideas that came up in this thread, and I never was quite satisfied. Often, the syntax became annoyingly complex, and the levels of abstraction needed almost caused my head to explode.

:woozy_face:

Maybe that's the curse of languages with a strong type system, when certain abstract data types aren't part of the standard libraries yet and its difficult to resort to well-known design patterns without getting lost in dozens of different approaches.

However, I would like to share the (current) result of my thoughts in regard to how I want to solve the original problem.

The core-trait is BinaryRepr, which allows dumping and restoring data from Tokio's AsyncRead or AsyncWrite streams:

#![feature(generic_associated_types)]
#![feature(type_alias_impl_trait)]

use futures::future::FutureExt;
use futures::pin_mut;
use std::future::Future;
use std::marker::Send;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead, AsyncWrite};

/// Types that can be serialized and deserialized
pub trait BinaryRepr: Sized {
    /// Return type of [`dump`](BinaryRepr::dump)
    type Dump<'a, 'b, W>: Future<Output = io::Result<()>> + Send;

    /// Serialize
    fn dump<'a, 'b, W>(&'a self, writer: &'b mut W) -> Self::Dump<'a, 'b, W>
    where
        W: AsyncWrite + Unpin + Send;

    /// Return type of [`restore`](BinaryRepr::restore)
    type Restore<'a, R>: Future<Output = io::Result<Self>> + Send;

    /// Deserialize
    fn restore<'a, R>(reader: &'a mut R) -> Self::Restore<'a, R>
    where
        R: AsyncRead + Unpin + Send;
}

Note that I do not use async-trait here, due to performance issues (extra allocation on the heap). Instead, I employ some methods discussed in this other thread:

Avoiding heap allocations in (some) async trait methods

That makes the syntax a bit awkward, but it's bearable. Implementing BinaryRepr looks like this:

impl BinaryRepr for SomeType {
    type Dump<'a, 'b, W> = impl Future<Output = io::Result<()>> + Send;
    fn dump<'a, 'b, W>(&'a self, writer: &'b mut W) -> Self::Dump<'a, 'b, W>
    where
        W: AsyncWrite + Unpin + Send,
    {
        async move {
            /* … */
        }
    }

    type Restore<'a, R> = impl Future<Output = io::Result<Self>> + Send;
    fn restore<'a, R>(reader: &'a mut R) -> Self::Restore<'a, R>
    where
        R: AsyncRead + Unpin + Send,
    {
        async move {
            /* … */
        }
    }
}

Actually I only want Dump and Restore to be Send if W or R is Send (and guarantee this through the trait), but that seems difficult to do, so I demand everything (Dump, Restore, W, and R) always being Send (and hope this doesn't cause me any trouble in future).

Now let's take a look at how to implement a non-async interface to extract binary data from those data types that don't really depend on any async operations for serialization (other than writing to an AsyncWriter).

First, we need something that makes AsyncWriter::write_poll invoke a (non-async) closure:

struct NowWriter<W> {
    write_func: W,
}

impl<W> AsyncWrite for NowWriter<W>
where
    W: FnMut(&[u8]) + Unpin + Send,
{
    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
        (Pin::into_inner(self).write_func)(buf);
        Poll::Ready(Ok(buf.len()))
    }
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

I had to restrict W to Send types here. Again, I hope that doesn't get me into trouble in future. :fearful:

Now we can define a trait DumpNow, which doesn't have any required methods, i.e. can be implemented easily by simply writing impl DumpNow for SomeType {}. That's how the trait is defined:

/// Implement this when polling result of `BinaryRepr::dump` never
/// returns [pending](std::task::Poll#variant.Pending) except if
/// caused by writing to `writer`.
pub trait DumpNow: BinaryRepr {
    /// Non-async version of `BinaryRepr::dump`
    fn dump_now<W>(&self, write_func: W)
    where
        W: FnMut(&[u8]) + Unpin + Send,
    {
        let writer = NowWriter { write_func };
        pin_mut!(writer);
        self.dump(&mut writer)
            .now_or_never()
            .expect("Unexpectedly encountered `Poll::pending` during dump")
            .expect("Unexpectedly encountered error during dump");
    }
    fn dump_to_vec(&self) -> Vec<u8> {
        let mut vec = Vec::new();
        self.dump_now(|slice| vec.extend_from_slice(slice));
        vec
    }
}

Note that I also added a dump_to_vec method for convenience here (that I also attempted to implement differently in past, see AsyncWrite into Vec). The dump_now methods lets me feed the binary representation of a data value into a hasher from non-async code (i.e. without "infecting" every involved function to be declared async).

The convention or "contract" that an implementation of BinaryRepr::dump isn't truly async (but only async insofar writes to the writer are involved) is now optional, and it gets expressed by implementing DumpNow on such a data type.

My solution still has an overhead because of the unnecessary unwraps (through the .expect calls in dump_now), but I avoid Boxing or trait objects. I also can write my serialization functions by simply operating on Tokio's AsyncRead and AsyncWrite interfaces (which provide handy functions such as AsyncWriteExt::write_u64 and others) such that I don't need to implement these methods or functions on my own or resort to more dependencies.

I hope that what I did here isn't too weird, but considering many (language) constraints I encountered during my attempts, it seems to be a reasonable compromise… :thinking:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.