Greetings, I'm working on a toy library called poolparty as a means of research and fun.
We want to be able to define a worker, a unit of work (i.e., task) and a supervisor pool that manages the workers. There's communication between workers and the supervisor, and the next step is to expose worker results to downstream client code.
#[derive(Debug)]
pub struct Supervisor<W: Workable> {
/// Internal worker pool, containing the queue of workers that are ready
/// to receive a task (i.e., checkout).
pool: BTreeMap<Pid, (Sender<Request<W>>, JoinHandle<()>)>,
/// An internal pool containing the list of checked out workers. We need
/// to do this in order to keep channels alive and keep communication with
/// workers even as they are running.
checked: BTreeMap<Pid, (Sender<Request<W>>, JoinHandle<()>)>,
/// Pending queue of tasks to be executed
tasks: VecDeque<W::Task>,
/// Queue of Tasks to be sent out.
pub queue: (Sender<W::Task>, Receiver<W::Task>),
/// Buffer of worker results
pub results: RingBuffer<Result<W::Output, W::Error>>,
/// Receiver end of the channel between all workers and the supervisor. This
/// allows workers to emit messages back to the supervisor efficiently.
receiver: Receiver<(Pid, Response<W>)>,
}
The results
buffer is just a trivial buffer:
#[derive(Debug)]
pub(crate) struct RingBuffer<T> {
inner: Mutex<VecDeque<T>>,
notify: Notify,
}
I have a trivial example client that performs some url fetching tasks as a means of gauging how the library is to use. The issue is in the main tokio::select
loop.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut supervisor: Supervisor<UrlFetchWorker> = Supervisor::new(5);
let queue_tx = supervisor.queue.0.clone();
tokio::select! {
_ = input_loop(queue_tx) => {},
_ = supervisor.run() => {},
_ = supervisor.results.recv() => {
todo!()
}
_ = tokio::signal::ctrl_c() => {
supervisor.shutdown().await;
return Ok(());
},
}
Ok(())
}
The issue being that supervisor.run()
takes &mut self
and as a results we cannot have supervisor.results.recv()
due to the mutable reference.
I've explored options around pin projection, mutexes, and decoupling the results handler entirely. All of them either made the client code more convoluted or lacked in performance/ergonomics in the library.
Given that self.run()
will never touch self.results
, are there anyother constructs I can use for this use-case? Many thanks in advance.