How to abstract over different channel implementations

I'm writing a library with the goal of being agnostic to various channel implementations (std, tokio, async-std, crossbeam...), and so I'm looking to wrap an underlying implementation. I'm wondering if the approach taken with the following code within my library is idiomatic:

trait ReceiverImpl<T> {
    fn try_recv(&self) -> Result<T, TryRecvError>;
    // ... and more
}

/// The receiving side of a channel.
pub struct Receiver<T> {
    pub underlying: dyn ReceiverImpl<T>,
}

impl<T> Receiver<T> {
    #[inline(always)]
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        (self.underlying.try_recv)()
    }
    // ... and more
}

// ...and more

I'm using an inline declaration above so to avoid a double dispatch. I'd really like to be able to avoid dispatching entirely and had started to dream about a macro... I might be getting ahead of myself though.

The ReceiverImpl will then be implemented by other crates targetting tokio etc.

Thanks for the guidance.

You won't be able to store a dyn ReceiverImpl directly, as it's unsized. You'll have to wrap it in a type like a Box. The only way to avoid the boxing is to use a generic instead of dyn Trait.

Also, T should be an associated type, not a generic, because you can't receive multiple types of item from one receiver.

trait ReceiverImpl {
    type Item;
    fn try_recv(&self) -> Result<Self::Item, TryRecvError>;
}

If you need blocking recv you will likely have to choose between sync (crossbeam, std) and async (tokio, async-channel), it would be hard to abstract over both.

1 Like

Thanks. I think I can use a ref instead of a Box, right? So long as I then also specify the lifetime i.e.:

pub trait ReceiverImpl {
    type Item;
    fn try_recv(&self) -> Result<Self::Item, TryRecvError>;
}

/// The receiving side of a channel.
pub struct Receiver<'a, T> {
    pub underlying: &'a dyn ReceiverImpl<Item = T>,
}

impl<'a, T> Receiver<'a, T> {
    #[inline(always)]
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
        self.underlying.try_recv()
    }
}

So, my ReceiverImpl instance must live at least as long as my Receiver struct..., right?

Thanks for the tip on the associated type.

If you use a reference then you must answer the question "who owns the receiver" and make sure that the owner lives for longer than the Receiver struct.

2 Likes

As a follow-up, I box trait objects that provide the actual implementations of my receivers and senders. My Receiver and Sender types only implement the functions I need.

Here's the full source of what I ended up doing; happy to take any further recommendations!

use alloc::boxed::Box;
use core::any::Any;
use core::fmt;

/// Provides an abstraction over any type of channel.
///
/// The primary declarations and their doc were copied from Crossbeam but appear
/// to apply across popular runtimes including Tokio and async-std.

/// The receiving side of a channel.
pub struct Receiver<T> {
    pub receiver_impl: Box<dyn ReceiverImpl<Item = T>>,
}

unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}

impl<T> fmt::Debug for Receiver<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Receiver { .. }")
    }
}

/// Required to be implemented by the provider of a channel
pub trait ReceiverImpl {
    type Item;
    /// Return self as an Any so that it can be downcast
    fn as_any(&self) -> &dyn Any;
}

/// An error returned from the [`recv`] method.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct RecvError;

impl fmt::Display for RecvError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        "receiving on an empty and disconnected channel".fmt(f)
    }
}

/// An error returned from the [`recv_timeout`] method.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum RecvTimeoutError {
    /// A message could not be received because the channel is empty and the operation timed out.
    ///
    /// If this is a zero-capacity channel, then the error indicates that there was no sender
    /// available to send a message and the operation timed out.
    Timeout,

    /// The message could not be received because the channel is empty and disconnected.
    Disconnected,
}

impl fmt::Display for RecvTimeoutError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match *self {
            RecvTimeoutError::Timeout => "timed out waiting on receive operation".fmt(f),
            RecvTimeoutError::Disconnected => "channel is empty and disconnected".fmt(f),
        }
    }
}

/// The sending side of a channel.
pub struct Sender<T> {
    pub sender_impl: Box<dyn SenderImpl<Item = T>>,
}

unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}

impl<T> Sender<T> {
    /// Attempts to send a message into the channel without blocking.
    ///
    /// This method will either send a message into the channel immediately or return an error if
    /// the channel is full or disconnected. The returned error contains the original message.
    ///
    /// If called on a zero-capacity channel, this method will send the message only if there
    /// happens to be a receive operation on the other side of the channel at the same time.
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
        self.sender_impl.try_send(msg)
    }
}

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Sender {
            sender_impl: self.sender_impl.clone(),
        }
    }
}

impl<T> fmt::Debug for Sender<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Sender { .. }")
    }
}

/// Required to be implemented by the provider of a channel
pub trait SenderImpl {
    type Item;
    // Provide a cloning function
    fn clone(&self) -> Box<dyn SenderImpl<Item = Self::Item>>;
    // Provide a try_send function
    fn try_send(&self, msg: Self::Item) -> Result<(), TrySendError<Self::Item>>;
}

/// An error returned from the [`try_send`] method.
///
/// The error contains the message being sent so it can be recovered.
///
/// [`try_send`]: super::Sender::try_send
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
    /// The message could not be sent because the channel is full.
    ///
    /// If this is a zero-capacity channel, then the error indicates that there was no receiver
    /// available to receive the message at the time.
    Full(T),

    /// The message could not be sent because the channel is disconnected.
    Disconnected(T),
}

impl<T> fmt::Display for TrySendError<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match *self {
            TrySendError::Full(..) => "sending on a full channel".fmt(f),
            TrySendError::Disconnected(..) => "sending on a disconnected channel".fmt(f),
        }
    }
}

/// An error returned from the [`try_recv`] method.
///
/// [`try_recv`]: super::Receiver::try_recv
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
    /// A message could not be received because the channel is empty.
    ///
    /// If this is a zero-capacity channel, then the error indicates that there was no sender
    /// available to send a message at the time.
    Empty,

    /// The message could not be received because the channel is empty and disconnected.
    Disconnected,
}

impl fmt::Display for TryRecvError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match *self {
            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
            TryRecvError::Disconnected => "receiving on an empty and disconnected channel".fmt(f),
        }
    }
}

'hope that this is useful to someone else, and thanks to everyone that helped!

This is unsound - it can be used to implement Send or Sync for an !Send or !Sync ReceiverImpl or SenderImpl. You should instead store a Box<dyn ReceiverImpl<Item = T> + Send + Sync> (and remove the unsafe), and the same applies on the sender side.

1 Like

Thanks. TBH - I copied those bits of code from Crossbeam.