Tokio select! futures composition

Hi there,

I use the macro tokio::select! in a loop to process incoming messages from channels. In this macro there are two main areas, the cancellation safe future, and the handler, which is not. The select loop, when running has synchronous access to the structure on which it is running, which is very convenient.

loop {
   tokio::select! {
      Ok(msg) = channel.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
         // access to self synchronously without lock
      }
     Ok(msg) = channel2.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
     Ok(msg) = channel_sub1.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
     Ok(msg) = channel_sub2.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
   }
}

Basically, my module relying on this select loop is growing and I want to split it into smaller chunks. The question is how. Some subparts can deal with a few messages, and it would make sense that instead of having a big flat select loop at the root, I would have scoped select loops dealing with their messages and their sub state. But I want to keep this code synchronous with the main select loop (not spawning sub tasks, I still want the access to self).

If we take the previous example, I want to do something like

// Extracted select
async fn sub_select() {
   tokio::select! {
     Ok(msg) = channel_sub1.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
     Ok(msg) = channel_sub2.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
   }
}
// main event loop
loop {
   tokio::select! {
      Ok(msg) = channel.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
         // access to self synchronously without lock
      }
     Ok(msg) = channel2.recv() /* cancellation safe future */ => {
         // message handler, not cancellation safe
      }
      _ = sub_select() /* not cancellation safe :'( */ => {
         // 
      }
   }
}

Do you know a clean/simple way to achieve that with Tokio ?

Thanks for your help.


enum BranchRes {
    Branch1(String),
    Branch2(usize),
}

async fn select(&mut self) -> BranchRes {
    tokio::select! {
        Ok(msg) = channel_sub1.recv() => BranchRes::Branch1(msg),
        Ok(msg) = channel_sub2.recv() => BranchRes::Branch2(msg),
    }
}

async fn dispatch(&mut self, res: BranchRes) {
    match res {
        BranchRes::Branch1(msg) => { /* ... */ }
        BranchRes::Branch2(msg) => { /* ... */ }
    }
}

// main event loop
loop {
    tokio::select! {
        ...
        res = self.select() => self.dispatch(res),
    }
}
5 Likes