How to safely access mutable and immutable disjoint fields

Greetings, I'm working on a toy library called poolparty as a means of research and fun.

We want to be able to define a worker, a unit of work (i.e., task) and a supervisor pool that manages the workers. There's communication between workers and the supervisor, and the next step is to expose worker results to downstream client code.

#[derive(Debug)]
pub struct Supervisor<W: Workable> {
   /// Internal worker pool, containing the queue of workers that are ready
   /// to receive a task (i.e., checkout).
   pool: BTreeMap<Pid, (Sender<Request<W>>, JoinHandle<()>)>,

   /// An internal pool containing the list of checked out workers. We need
   /// to do this in order to keep channels alive and keep communication with
   /// workers even as they are running.
   checked: BTreeMap<Pid, (Sender<Request<W>>, JoinHandle<()>)>,

   /// Pending queue of tasks to be executed
   tasks: VecDeque<W::Task>,

   /// Queue of Tasks to be sent out.
   pub queue: (Sender<W::Task>, Receiver<W::Task>),

   /// Buffer of worker results
   pub results: RingBuffer<Result<W::Output, W::Error>>,

   /// Receiver end of the channel between all workers and the supervisor. This
   /// allows workers to emit messages back to the supervisor efficiently.
   receiver: Receiver<(Pid, Response<W>)>,
}

The results buffer is just a trivial buffer:

#[derive(Debug)]
pub(crate) struct RingBuffer<T> {
    inner: Mutex<VecDeque<T>>,
    notify: Notify,
}

I have a trivial example client that performs some url fetching tasks as a means of gauging how the library is to use. The issue is in the main tokio::select loop.

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut supervisor: Supervisor<UrlFetchWorker> = Supervisor::new(5);
    let queue_tx = supervisor.queue.0.clone();

    tokio::select! {
        _ = input_loop(queue_tx) => {},
        _ = supervisor.run() => {},
        _ = supervisor.results.recv() => {
            todo!()
        }
        _ = tokio::signal::ctrl_c() => {
            supervisor.shutdown().await;
            return Ok(());
        },
    }

    Ok(())
}

The issue being that supervisor.run() takes &mut self and as a results we cannot have supervisor.results.recv() due to the mutable reference.

I've explored options around pin projection, mutexes, and decoupling the results handler entirely. All of them either made the client code more convoluted or lacked in performance/ergonomics in the library.

Given that self.run() will never touch self.results, are there anyother constructs I can use for this use-case? Many thanks in advance.

Unfortunately, the current way to tell the compiler this is to move run() to a lower level struct that doesn't contain results (decoupling the results handler, as you mentioned)

2 Likes

Manually splitting borrows into view types is another approach that doesn't require factoring.

2 Likes

Thanks! Only issue with this is that run() does interact with results. It pushes worker results into the buffer. In fact my question title is incorrect as they are not disjoint.

RingBuffer internally has locking of the internal buffer via Mutex, so R/W of that buffer should be fine. The reason I chose this is that I'd like to remove these details from downstream code.

So in the compiler's eyes, judging from the signature alone there is no way to guarantee safety and therefore fails. Another approach would be to simply change this to an mpsc/broadcast channel but it feels like a cop-out at this stage. I'd really like to figure out a way to get this to work nicely.

The temporary approach I took for the time being is to keep a reference to the buffer outside of the supervisor and parameterise it during initialisation.

#[derive(Debug)]
pub struct Supervisor<'buf, W: Workable> {
    /// Buffer of worker results
    pub results: &'buf RingBuffer<Result<W::Output, W::Error>>,
}

And then passing it throuh initialisation:

pub fn new(size: usize, buffer: &'buf RingBuffer<Result<W::Output, W::Error>>) -> Self

Not ideal, and something I'd like to tackle better in the future. Thanks for your help either way!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.