Help refactoring code

Playground

Description

I'm trying to refactor my code to abstract away duplicated logic but I can't quite figure quite how to make it work. In the playground I linked you can see how similar send_directory and send_bench are but they each initialize some variables outside of the loop and then do slightly different logic inside each loop potentially using those variables. I've commented one of the functions to highlight the duplicated code below I am trying to abstract away.

async fn send_directory(rx: Receiver<Frame>, rx_events: Receiver<()>, path: PathBuf) -> Result<()> {
    // branch specific variable declaration
    let canon_path = path.canonicalize()?;
    
    let mut should_exit = false;                                              ///////////////
    loop {                                                                    //
        // check for any events                                               //
        if let Ok(event) = rx_events.try_recv() {                             // repeated code
            should_exit = true;                                               //
        }                                                                     //
                                                                              //
        match timeout(Duration::from_millis(100), rx.recv_async()).await? {   //
            Ok(frame) => {                                                    ///////////////
                // branch specific stuff
                use_thing_dir(frame, &canon_path).await?;
            }                                                                 ///////////////
            _ => {                                                            //
                // do stuff                                                   //
                if should_exit {                                              //
                    break;                                                    // repeated code
                }                                                             //
            }                                                                 //
        }                                                                     //
    }                                                                         //
                                                                              //
    Ok(())                                                                    ///////////////
}

What I've Tried

At first I thought it wouldn't be too bad--create a closure to take in the frame parameter but still has access to the initialized variables above. Then break out the loop logic into its own method and pass in my closure and execute every loop iteration. But I've not quiet been able to get it to work out because my action I am trying to pass in ends up not being FnMut.

async fn send_directory(rx: Receiver<Frame>, rx_events: Receiver<()>, path: PathBuf) -> Result<()> {
    let canon_path = path.canonicalize()?;

    let action = |frame: Frame| async move {
       use_thing_dir(frame, &canon_path).await?;
       Ok::<(), anyhow::Error>(())
    };

    recv_loop(&config, &rx, &mut action).await
}

async fn recv_loop<A, Fut>(config: &Config, rx: &flume::Receiver<Frame>, action: &mut A) -> Result<()>
where
A: FnMut(Frame) -> Fut,
Fut: futures::Future<Output = Result<()>> {
    let mut event_reader = config::bus_reader(&config.event_bus)?;
    let recv_duration = Duration::from_millis(100);
    let mut should_exit = false;

    loop {
        match event_reader.try_recv() {
            Ok(Event::EXIT) => {
                log::info!("exiting recv_directory");
                break;
            }
            Ok(Event::CLEANUP(Component::Receiver)) => {
                should_exit = true;
            }
            _ => {}
        }

        match rx.recv_timeout(recv_duration) {
            Ok(frame) => {
                action(frame).await?;
            }
            _ => {
                if should_exit {
                    break;
                }
            }
        }
    }
    Ok(())
}

well, you are close. FnMut means it can be called multiple times, but your closure returns an async move {} block. I don't know the exact capture rules here, but it seems the async move {} block (although inside a non-move closure,) captures the outer canon_path variable by value , which means the async block can be only constructed once, making the closure an FnOnce instead of FnMut.

in this example, the move is unnecessary, you can just remove it, it should be fine: (BTW, you can use anyhow::Ok() when the Err variant cannot be inferred)

    let canon_path = path.canonicalize()?;

    let mut action = |frame: Frame| async {
       use_thing_dir(frame, &canon_path).await?;
       anyhow::Ok(())
    };

    recv_loop(&config, &rx, &mut action).await

I think some explicit capture control could also work:

    let canon_path = path.canonicalize()?;

    let mut action = |frame: Frame| {
        // be explicit about what got "move"-ed when construct the Future of async block
        // you can also do the same for the outer closure but it's not necessary
        let canon_path = &canon_path;
        async move {
             use_thing_dir(frame, canon_path).await?;
            anyhow::Ok(())
        }
    };

    recv_loop(&config, &rx, &mut action).await
1 Like

The idea of providing the action as a closure is a nice way to deduplicate the code. I do have a feeling that if we knew what behavior you were trying to implement, we could probably refactor the whole thing to be much simpler.

Here's another (similar) formulation.

I mostly moved the awaits around. Async isn't my forte, so I couldn't say what the pros and cons are.

Thanks so much for the replies! I think I may have simplified my original example a little too much. I've put a more fleshed out example below. I appreciate the time y'all have put in to understand my specific problem.

I'm a bit amazed that I've worked on so many of iterations of this project, both sync and async iterations, written thousands of lines of code, am interfacing synchronous and asynchronous code in a few spots in the same project now, refactored so often throughout all of that, and it isn't until a relatively minor refactor to clean up some duplicated code that I get completely blocked.

Background
Basically this is just redirecting an inflow of packets to be handled by another task. A Frame represents a packet from a multiplexed data flow and this code is meant to split the combined flow of Frames out into their own separate channels so the demultiplexed data can be used. I'm receiving packets at relatively high rates (at least several hundred thousand per second) so I've been hesitant to throw a Mutex into the mix to help solve issues (although I recognize at that rate that is probably perfectly fine)

Updated Action Example

Here is a more realistic example of one of the actions I'm trying to create. It is actually a little more complicated than I originally showed because I'm borrowing some of those variables as mutable. That right there might be enough of a deal breaker that this can't be broken out into a closure without using a mutex. Of course since this code does deal with one Frame at a time maybe there is a way to assure the compiler destination_id_map can mutably borrowed?

The block of code below won't compile.

struct Frame {
  header: FrameHeader,
  bytes: BytesMut
}

struct FrameHeader {
  stream_id: u16,
  length: u16,
  command: FrameCommand
}

enum FrameCommand {
    Sync,
    Send,
    Finish,
}

async fn recv_directory(config: &Config, rx: Receiver<Frame>, rx_events: Receiver<()>, path: PathBuf) -> Result<()> {
    let mut destination_id_map: IntMap<u16, flume::Sender<Frame>> = IntMap::default();
    let canon_path: PathBuf = destination.path.canonicalize()?.into();

    let mut action = |frame: Frame| async {
        match frame.header.command {                 // <-- complains about borrowing without moving
            FrameCommand::Sync => {
                let (tx, rx) = flume::bounded(config.channel_capacity);
                destination_id_map.insert(frame.header.stream_id, tx);     // <-- borrowing mutable copy not FnMut friendly
                spawn_recv_file_task(canon_destination_path.clone(), rx.clone()).await?;
            }
            FrameCommand::Send => {
                if let Some(tx) = destination_id_map.get(&frame.header.stream_id) {
                    tx.send_async(frame).await?;
                }
            }
            FrameCommand::Finish => {
                destination_id_map.remove(&frame.header.stream_id);
            }
            _ => {}
        };

        anyhow::Ok(())
    };

    recv_loop(&config, &rx, &mut action).await
}

Can you provide a playground?

I'm not sure what the objective is anymore and indeed a playground link would be helpful.

If you're going to access a map from multiple tasks, there needs to be synchronisation. If you can have the map be owned by a single task, you don't need synchronisation. There are specialized concurrent data structures like dashmap (although I heard that it's not quite production ready) or you can put an Arc<Mutex<_>> around your collection to make it usable in a concurrent setting.

1 Like

Sorry, I should have provided a playground last night. Here is my Updated Playground that as of this morning is almost working with y'alls help! I've constrained the async closures to just where they're needed. Now my biggest problem is having closures that should all be Future<Output = Result<()>> but not considered the same in my match arms. Well that and the little grossness of having async { anyhow::Ok(()) } everywhere but that isn't as big of a problem.

In general in Rust, “I want to return all of these as the same type” is solved by creating some kind of wrapper that has the same type regardless of its contents. The two general purpose sorts of wrappers are enums and boxed trait objects.

  1. In the specific case of two arms, you can often use either::Either which implements common traits like Iterator and Future based on its two inner types. For more arms, you have to define your own enum.

  2. Boxed trait objects are also very common for Futures — Box<dyn Future>. They're so common that the futures crate provides a type alias, BoxFuture for them, and an extension method, .boxed() to create them. So, just import it and add .boxed() after each future.

  3. A third way, which only works for Futures but generalizes to any number of arms, is to let an async block handle the problem (because async blocks' control flow states have to do just that all the time), by putting your entire match in one async block.

However, even once you've solved the Future mismatch one way or the other, you'll get this error:

error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:64:9
   |
60 |       let mut destination_id_map: HashMap<u16, flume::Sender<Frame>> = HashMap::default();
   |           ---------------------- variable defined here
...
63 |       let mut action = move |frame: Frame| {
   |                                          - inferred to be a `FnMut` closure
64 | /         match frame.header.command {
65 | |             FrameCommand::Sync => {
66 | |                 let (tx, rx) = flume::bounded(4);
67 | |                 destination_id_map.insert(frame.header.stream_id, tx);
   | |                 ------------------ variable captured here
...  |
83 | |             _ => async { Ok(()) }.boxed()
84 | |         }
   | |_________^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

This is because this part:

            FrameCommand::Send => {
                match destination_id_map.get(&frame.header.stream_id) {
                    Some(tx) => async {
                        tx.send_async(frame).await?;

has the async block borrow tx from destination_id_map, which is captured by the closure, and this kind of pattern — a FnMut closure handing out borrows to its captures — cannot be supported due to the definition of the FnMut trait (a function can only return things which don't depend on the function's continued existence; this is most obvious for FnOnces and all FnMuts must also be able to act for FnOnces).

In order to solve this problem, you will need to give the two things independent lives in some way; the easiest is perhaps to use reference-counting, so that the flume::Sender<Frame> can be jointly owned instead of borrowed. That is easy, and now the program compiles:

                destination_id_map.insert(frame.header.stream_id, Arc::new(tx));
...
            FrameCommand::Send => {
                match destination_id_map.get(&frame.header.stream_id) {
                    Some(tx) => {
                        let tx: Arc<_> = tx.clone(); // don't borrow from the map
                        async move {
                            tx.send_async(frame).await?;
                            Ok(())
                        }
                        .boxed()

Complete program on Playground

Well that and the little grossness of having async { anyhow::Ok(()) } everywhere but that isn't as big of a problem.

Note that you don't need to specify anyhow:: as long as there's at least one place that specifies the error type of the entire function (i.e. the -> Result<()> that you already have will); you can just write Ok(()). You can also use std::future::ready instead of a trivial async block; this may or may not seem nicer to you.

3 Likes

Thanks for your help, it really helped me get much closer to the solution!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.