Do not require user to implement internal thread-safe mutability for Future handler

I have an API that currently requires an implementor to do a lot of legwork in order to implement a response type that handles data asynchronously processed by another thread to resolve to a Future:

use std::future::Future;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};

use log::error;
use serialport::TTYPort;

use listener::Listener;
use transmitter::Transmitter;

use crate::packet::FrameBuffer;
use crate::protocol::{Command, Response};
use crate::util::NonPoisonedRwLock;
use crate::Error;

mod listener;
mod transmitter;

/// A host controller to communicate with an NCP via the `ASHv2` protocol.
#[derive(Debug)]
pub struct Host {
    running: Arc<AtomicBool>,
    command: Sender<Command>,
    listener_thread: Option<JoinHandle<()>>,
    transmitter_thread: Option<JoinHandle<()>>,
}

impl Host {
    /// Creates and starts the host.
    ///
    /// # Errors
    /// Returns an [`Error`] if the host could not be started.
    pub fn spawn(
        serial_port: TTYPort,
        callback: Option<Sender<FrameBuffer>>,
    ) -> Result<Self, Error> {
        let running = Arc::new(AtomicBool::new(true));
        let (command_sender, command_receiver) = channel();
        let connected = Arc::new(AtomicBool::new(false));
        let handler = Arc::new(NonPoisonedRwLock::new(None));
        let ack_number = Arc::new(AtomicU8::new(0));
        let (listener, ack_receiver, nak_receiver) = Listener::new(
            serial_port.try_clone_native()?,
            running.clone(),
            connected.clone(),
            handler.clone(),
            ack_number.clone(),
            callback,
        );
        let transmitter = Transmitter::new(
            serial_port,
            running.clone(),
            connected,
            command_receiver,
            handler,
            ack_number,
            ack_receiver,
            nak_receiver,
        );

        Ok(Self {
            command: command_sender,
            running,
            listener_thread: Some(spawn(move || listener.run())),
            transmitter_thread: Some(spawn(move || transmitter.run())),
        })
    }

    /// Communicate with the NCP, returning [`T::Result`](Response::Result).
    ///
    /// # Errors
    /// Returns [`T::Error`](Response::Error) if the transactions fails.
    pub async fn communicate<T>(&self, payload: &[u8]) -> <T as Future>::Output
    where
        T: Clone + Default + Response + 'static,
    {
        let response = T::default();
        self.command
            .send(Command::new(Arc::from(payload), Arc::new(response.clone())))
            .map_err(|_| Error::Terminated)?;
        response.await
    }
}

impl Drop for Host {
    fn drop(&mut self) {
        self.running.store(false, SeqCst);

        if let Some(thread) = self.listener_thread.take() {
            thread.join().unwrap_or_else(|_| {
                error!("Failed to join listener thread.");
            });
        }

        if let Some(thread) = self.transmitter_thread.take() {
            thread.join().unwrap_or_else(|_| {
                error!("Failed to join transmitter thread.");
            });
        }
    }
}

with

use std::fmt::Debug;
use std::future::Future;

use crate::Error;

/// Result of handling an event.
///
/// This enum is returned from [`Handler::handle`] to indicate the outcome of handling the respective event.
///
/// * [`HandleResult::Completed`] indicates that the handler successfully processed the data and does not expect any more data.  
/// * [`HandleResult::Continue`] indicates that the handler successfully processed the data and is expecting more data.
/// * [`HandleResult::Reject`] indicates that the handler was unable to process the passed data and is expecting more data.
/// * [`HandleResult::Failed`] indicates that the handler was unable to process the passed data and cannot continue.
#[derive(Debug)]
pub enum HandleResult {
    Completed,
    Continue,
    Reject,
    Failed,
}

/// Events sent to a [`Handler`].
///
/// * [`Event::TransmissionCompleted`] indicates that the requested payload has been transmitted completely.
/// * [`Event::DataReceived`] indicates that the listener has received a potential data packet that it is forwarding to the handler for processing.
#[derive(Debug)]
pub enum Event<'data> {
    TransmissionCompleted,
    DataReceived(&'data [u8]),
}

/// Handle `ASHv2` protocol events.
pub trait Handler: Debug + Send + Sync {
    /// Handle the incoming  [`Event`] and return an appropriate [`HandleResult`].
    fn handle(&self, event: Event) -> HandleResult;

    /// Abort the current transaction, resulting in an erroneous state.
    fn abort(&self, error: Error);

    /// Wake the underlying [`Waker`](std::task::Waker) to complete the respective [`Future`].
    fn wake(&self);
}

/// A response to a request sent to the NCP.
///
/// This is a composite trait consisting of a [`Future`] and a [`Handler`] implementation.
/// The Future must output [`Result<Self::Result, Self::Error>`].
pub trait Response: Future<Output = Result<Self::Result, Self::Error>> + Handler {
    type Result;
    type Error: From<Error>;
}

However, this requires a lot of trait bounds and to have the user implement the trait Handler with internal, thread-safe mutability - which is exactly what I want to avoid.

What I want is something along these lines:

    pub async fn communicate<T>(&self, payload: &[u8]) -> Result<T::Result, T::Error>
    where
        T: Default + Response + 'static,
        Arc<Mutex<T>>: Future<Output = Result<T::Result, T::Error>>,
    {
        let response = Arc::new(Mutex::new(T::default()));
        self.command
            .send(Command::new(Arc::from(payload), response.clone()))
            .map_err(|_| Error::Terminated)?;
        response.await
    }

This, however, is not possible to implement for the user, since they cannot implement Future for a type not owned by the crate Arc<...>. Also I would not know how the await should work on an Arc<Mutex<...>>.

Is there a way to not require the Handler trait to implement thread-safe internal mutability and automagically wrapping it in a thread-safe Future?

Entire repo for more context: GitHub - PaulmannLighting/ashv2: Asynchronous Serial Host protocol, version 2

Even if they could, you probably should not demand that the caller implement Future in any way other than an async {} block or function, which won't be an Arc<_> type anyway.

Usually, when you have a custom trait and futures, the custom trait allows creating futures (i.e. has async fns, or similar but explicit code) rather than being implemented for futures.

I'm not sure what you're trying to achieve. Are you trying to have communicate()

  • send a future to another thread, where it will run, and
  • wait for it to complete on this thread?

If so, then what you should do is have communicate() create a oneshot channel which communicates the completion, instead of trying to share the future value itself. The channel sender can be sent along with the Future, and the channel receiver will be awaited.

If that's not it, then it would help if you share with us what Transmitter is doing, how Handler is actually used, and what the intended control flow of this code is — what does it do in what order, and how is the user's Future and Handler code invoked?

4 Likes

You can always provide a newtype wrapper around Arc<Mutex<T>> that you blanket-impl Future for, given T: Future.

However, …

You typically don't want a lock to be interleaved with awaiting. It is usually a design error if you are trying to hold a mutex across await points. What does your user have to do with a Mutex, and why can't just they use one internally, if they need one? Futures are assumed to be concurrency-safe by design.

3 Likes

That is exactly what is happening. However, I don't really understand what I should send through the oneshot channel. The future evaluates to a generic type, let's call it T. But there is only ever one processing thread. How should the thread know which Future<Output = T> to return? I cannot make the processing thread generic over T, can I?

You could wrap the future that returns T in a new future that awaits it and then sends the result to the oneshot channel. The resulting future would return (), while still sending the correct type T to the oneshot channel.

2 Likes

Thanks, I am probably misunderstanding you, but that future then would be Future<Output = Future<Output = T>>, so I still carry around that T, right?
Also the awaiting of the ("main") future is the waiting for the processor thread to process the request. The future only ever resolves, when the processor thread has finished processing it.

No. Let's assume that you have a future fut that implements Future<Output = T>. You would then do something like this:

// This could be either from `futures`, `tokio` or some other crate
let (tx, rx) = oneshot::channel();
let new_fut = async move {
    let res = fut.await;
    tx.send(res);
};

You're then left with rx, which you can use to get back the result, and new_fut, which implements Future<Output = ()> and you can send them independently where you need them (e.g. new_fut to the executor and rx where you need back the result of type T).

1 Like

Thanks, that makes it more clear. I am still uncertain as to how I'd get tx into the processing thread though. I'm going to dig in the docs this evening.

tx is moved inside new_fut, it cannot be touched again after that. All you would be working with are new_fut and rx, which respectively contain the task to run and the endpoint from which you get the result of that task once it has ended. Supposedly the processing thread will poll new_fut

1 Like