I have an API that currently requires an implementor to do a lot of legwork in order to implement a response type that handles data asynchronously processed by another thread to resolve to a Future
:
use std::future::Future;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use log::error;
use serialport::TTYPort;
use listener::Listener;
use transmitter::Transmitter;
use crate::packet::FrameBuffer;
use crate::protocol::{Command, Response};
use crate::util::NonPoisonedRwLock;
use crate::Error;
mod listener;
mod transmitter;
/// A host controller to communicate with an NCP via the `ASHv2` protocol.
#[derive(Debug)]
pub struct Host {
running: Arc<AtomicBool>,
command: Sender<Command>,
listener_thread: Option<JoinHandle<()>>,
transmitter_thread: Option<JoinHandle<()>>,
}
impl Host {
/// Creates and starts the host.
///
/// # Errors
/// Returns an [`Error`] if the host could not be started.
pub fn spawn(
serial_port: TTYPort,
callback: Option<Sender<FrameBuffer>>,
) -> Result<Self, Error> {
let running = Arc::new(AtomicBool::new(true));
let (command_sender, command_receiver) = channel();
let connected = Arc::new(AtomicBool::new(false));
let handler = Arc::new(NonPoisonedRwLock::new(None));
let ack_number = Arc::new(AtomicU8::new(0));
let (listener, ack_receiver, nak_receiver) = Listener::new(
serial_port.try_clone_native()?,
running.clone(),
connected.clone(),
handler.clone(),
ack_number.clone(),
callback,
);
let transmitter = Transmitter::new(
serial_port,
running.clone(),
connected,
command_receiver,
handler,
ack_number,
ack_receiver,
nak_receiver,
);
Ok(Self {
command: command_sender,
running,
listener_thread: Some(spawn(move || listener.run())),
transmitter_thread: Some(spawn(move || transmitter.run())),
})
}
/// Communicate with the NCP, returning [`T::Result`](Response::Result).
///
/// # Errors
/// Returns [`T::Error`](Response::Error) if the transactions fails.
pub async fn communicate<T>(&self, payload: &[u8]) -> <T as Future>::Output
where
T: Clone + Default + Response + 'static,
{
let response = T::default();
self.command
.send(Command::new(Arc::from(payload), Arc::new(response.clone())))
.map_err(|_| Error::Terminated)?;
response.await
}
}
impl Drop for Host {
fn drop(&mut self) {
self.running.store(false, SeqCst);
if let Some(thread) = self.listener_thread.take() {
thread.join().unwrap_or_else(|_| {
error!("Failed to join listener thread.");
});
}
if let Some(thread) = self.transmitter_thread.take() {
thread.join().unwrap_or_else(|_| {
error!("Failed to join transmitter thread.");
});
}
}
}
with
use std::fmt::Debug;
use std::future::Future;
use crate::Error;
/// Result of handling an event.
///
/// This enum is returned from [`Handler::handle`] to indicate the outcome of handling the respective event.
///
/// * [`HandleResult::Completed`] indicates that the handler successfully processed the data and does not expect any more data.
/// * [`HandleResult::Continue`] indicates that the handler successfully processed the data and is expecting more data.
/// * [`HandleResult::Reject`] indicates that the handler was unable to process the passed data and is expecting more data.
/// * [`HandleResult::Failed`] indicates that the handler was unable to process the passed data and cannot continue.
#[derive(Debug)]
pub enum HandleResult {
Completed,
Continue,
Reject,
Failed,
}
/// Events sent to a [`Handler`].
///
/// * [`Event::TransmissionCompleted`] indicates that the requested payload has been transmitted completely.
/// * [`Event::DataReceived`] indicates that the listener has received a potential data packet that it is forwarding to the handler for processing.
#[derive(Debug)]
pub enum Event<'data> {
TransmissionCompleted,
DataReceived(&'data [u8]),
}
/// Handle `ASHv2` protocol events.
pub trait Handler: Debug + Send + Sync {
/// Handle the incoming [`Event`] and return an appropriate [`HandleResult`].
fn handle(&self, event: Event) -> HandleResult;
/// Abort the current transaction, resulting in an erroneous state.
fn abort(&self, error: Error);
/// Wake the underlying [`Waker`](std::task::Waker) to complete the respective [`Future`].
fn wake(&self);
}
/// A response to a request sent to the NCP.
///
/// This is a composite trait consisting of a [`Future`] and a [`Handler`] implementation.
/// The Future must output [`Result<Self::Result, Self::Error>`].
pub trait Response: Future<Output = Result<Self::Result, Self::Error>> + Handler {
type Result;
type Error: From<Error>;
}
However, this requires a lot of trait bounds and to have the user implement the trait Handler
with internal, thread-safe mutability - which is exactly what I want to avoid.
What I want is something along these lines:
pub async fn communicate<T>(&self, payload: &[u8]) -> Result<T::Result, T::Error>
where
T: Default + Response + 'static,
Arc<Mutex<T>>: Future<Output = Result<T::Result, T::Error>>,
{
let response = Arc::new(Mutex::new(T::default()));
self.command
.send(Command::new(Arc::from(payload), response.clone()))
.map_err(|_| Error::Terminated)?;
response.await
}
This, however, is not possible to implement for the user, since they cannot implement Future
for a type not owned by the crate Arc<...>
. Also I would not know how the await should work on an Arc<Mutex<...>>
.
Is there a way to not require the Handler
trait to implement thread-safe internal mutability and automagically wrapping it in a thread-safe Future
?
Entire repo for more context: GitHub - PaulmannLighting/ashv2: Asynchronous Serial Host protocol, version 2