Handling sequential operations in futures

#1

Background

For a little while I’ve been working on a nice interface/configuration GUI to one of my hobby projects. It is a USB HID device which performs actions in a command-response sort of format using HID reports. I send a command to it and then later it would send me a response. This is fairly easy to do, since on both Linux and Windows you just open the appropriate file and write/read to it. I decided to try using futures in my utility because I had some experience doing this thing (dealing with HID devices in raw reports) in C# and found it much easier to manage the device if I made it non-blocking and used async/await.

Problem

I managed to write an abstraction around the raw file descriptor for the device that performs a single write-read operation sequentially (get_parameter in my example), returning a future for when the response is received. I’ve made the resulting future own the Device temporarily and pass it back out, along with the response, when the future is complete as a tuple (you’ll see me using .0 and .1 in my example; 0 is the device and 1 is the response). This code behind that method is fairly easy for me to follow and I’m satisfied with it. However, I need to do things like read back the entire state of the device. This is performed by doing a bunch of these write-read operations in sequence. I’ve found that this quickly becomes cumbersome and verbose. For example, here is how I read back a bunch of device settings (hopefully the naming makes the code self-documenting):

/// Settings for a fader on the device
pub struct Fader {
    index: u32,
    channel: FdrMidiChannel,
    mode: FdrMode,
    control: FdrControl,
    control_min: FdrControlMin,
    control_max: FdrControlMax,
    pitch_min: FdrPitchMin,
    pitch_max: FdrPitchMax,
}

impl Fader {
    /// Builds a fader configuration using the passed device and index
    ///
    /// This returns a future for the device and the associated fader configuration
    fn get_fader_configuration<T: AsyncHidDevice<MidiFader>>(device: T, index: u32) -> impl Future<Item=(T, Self), Error=Error> {
        device.get_parameter(FdrMidiChannel::index_parameter(index))
            .and_then(move |res| {
                let ch = res.1;
                res.0.get_parameter(FdrMode::index_parameter(index))
                    .join(Ok(ch))
            })
            .and_then(move |(res, ch)| {
                let mode = res.1;
                res.0.get_parameter(FdrControl::index_parameter(index))
                    .join(Ok((ch, mode)))
            })
            .and_then(move |(res, (ch, mode))| {
                let control = res.1;
                res.0.get_parameter(FdrControlMin::index_parameter(index))
                    .join(Ok((ch, mode, control)))
            })
            .and_then(move |(res, (ch, mode, control))| {
                let control_min = res.1;
                res.0.get_parameter(FdrControlMax::index_parameter(index))
                    .join(Ok((ch, mode, control, control_min)))
            })
            .and_then(move |(res, (ch, mode, control, control_min))| {
                let control_max = res.1;
                res.0.get_parameter(FdrPitchMin::index_parameter(index))
                    .join(Ok((ch, mode, control, control_min, control_max)))
            })
            .and_then(move |(res, (ch, mode, control, control_min, control_max))| {
                let pitch_min = res.1;
                res.0.get_parameter(FdrPitchMax::index_parameter(index))
                    .join(Ok((ch, mode, control, control_min, control_max, pitch_min)))
            })
            .and_then(move |(res, (ch, mode, control, control_min, control_max, pitch_min))| {
                let pitch_max = res.1;
                let fader = Fader {
                    index: index,
                    channel: FdrMidiChannel::new(index, ch.value().into()),
                    mode: FdrMode::new(index, mode.value().into()),
                    control: FdrControl::new(index, control.value().into()),
                    control_min: FdrControlMin::new(index, control_min.value().into()),
                    control_max: FdrControlMax::new(index, control_max.value().into()),
                    pitch_min: FdrPitchMin::new(index, pitch_min.value().into()),
                    pitch_max: FdrPitchMax::new(index, pitch_max.value().into()),
                };
                Ok((res.0, fader))
            })
    }
}

Although this works, I have a couple problems with this:

  1. When I’m adding new things and trying to interpret errors from the compiler the gigantic and_then chain results in huge typenames. This isn’t the worst, but it certainly makes problems harder to read:
error[E0271]: type mismatch resolving `<futures::AndThen<futures::AndThen<futures::AndThen<futures::AndThen<futures::AndThen<futures::AndThen<futures::AndThen<device::GetParameter<T>, futures::Join<device::GetParameter<T>, futures::FutureResult<device::ParameterValue, device::Error>>, [closure@src/config.rs:322:23: 326:14 index:_]>, futures::Join<device::GetParameter<T>, futures::FutureResult<(device::ParameterValue, device::ParameterValue), device::Error>>, [closure@src/config.rs:327:23: 331:14 index:_]>, futures::Join<device::GetParameter<T>, futures::FutureResult<(device::ParameterValue, device::ParameterValue, device::ParameterValue), device::Error>>, [closure@src/config.rs:332:23: 336:14 index:_]>, futures::Join<device::GetParameter<T>, futures::FutureResult<(device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue), device::Error>>, [closure@src/config.rs:337:23: 341:14 index:_]>, futures::Join<device::GetParameter<T>, futures::FutureResult<(device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue), device::Error>>, [closure@src/config.rs:342:23: 346:14 index:_]>, futures::Join<device::GetParameter<T>, futures::FutureResult<(device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue, device::ParameterValue), device::Error>>, [closure@src/config.rs:347:23: 351:14 index:_]>, std::result::Result<(T, config::Fader), device::Error>, [closure@src/config.rs:352:23: 365:14 index:_]> as futures::Future>::Error == config::Error
  1. This is incredibly verbose since I’m accumulating all the many members of the object as I go along the and_then chain before finally constructing the final product. As I do more things in a row, the final tuple that I pass into the and_then becomes longer and longer.

Question

Is there a better way to do this?

If this were in C#, I could just await for each member and not have to pass along the values of all previous operations until I finally finish (it’s like I’m manually capturing the scope/stack frame). In Rust, I haven’t yet thought of a better way that satisfies lifetime requirements. I’m using stable rust right now, but I’m not opposed to switching to nightly. I’ve also not extensively looked into the actual await macro that I’ve seen used sometimes.

#2

Just curious why you wouldn’t be able to turn all those .and_thens into one big .and_then seeing as it’s basically a .map but with futures (I think, I haven’t worked with futures because I’ve never fully understood async well enough to use it, and I rarely read from files in my projects)

#3

Sorry, I just realized that there’s some more information that would be helpful.

Here’s the function signature for get_parameter (and others):

/// Extensions for talking to the midi fader device
///
/// These are implemented for all types that implement AsyncHidDevice<MidiFader>
pub trait MidiFaderExtensions<T: AsyncHidDevice<MidiFader>> {
    /// Gets the device status
    fn device_status(self) -> Status<T>;
    /// Gets a device parameter
    fn get_parameter(self, parameter: u16) -> GetParameter<T>;
    /// Sets a device parameter
    fn set_parameter(self, parameter: u16, value: ParameterValue) -> SetParameter<T>;
}

impl<T: AsyncHidDevice<MidiFader>> MidiFaderExtensions<T> for T {
    fn device_status(self) -> Status<T> {
        Status::new(self)
    }
    fn get_parameter(self, parameter: u16) -> GetParameter<T> {
        GetParameter::new(self, parameter)
    }
    fn set_parameter(self, parameter: u16, value: ParameterValue) -> SetParameter<T> {
        SetParameter::new(self, parameter, value)
    }
}

...

impl<T: AsyncHidDevice<MidiFader>> Future for GetParameter<T> {
    type Item = (T, ParameterValue);
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
...
    }
}

As you can see, self (i.e. “device” in my get_fader_configuration function earlier) is consumed and doesn’t come back until the GetParameter<T>'s poll completes (since T is the device and is part of GetParameter's Item). This serves two purposes:

  • The hardware can’t handle interleaved/multiple queries at once (hopefully no one opens the program twice). So either a mutable borrow or a transfer of ownership is the easiest way to ensure this is all good. I can catch if this has occurred though and throw back an error, so it’s not terrible.
  • I can’t borrow past a yield as far as I know. So, if I were to allow multiple commands at once and also immutably borrow &self I could then put everything in one big and_then, but then I would be borrowing &self past a yield (Unless this rule has changed? I know of the Pin interface, but I don’t know how to use it). So instead, GetParameter gets to own the device for the duration of the future.