How to create iterator with results

Hi everyone,

I'm trying to create an iterator that returns Result<Vec<Result<Something>>> each time. The first Result is related to the iterator creation, while the vector is the iterator itself (simplified for the question). Each item in the iterator should return a Result, allowing some iterations to yield errors, but the user should be able to ignore them.

My current function looks like this:

fn get_segments() -> Result<impl Iterator<Item = Result<Segment>> + '_> {
    let mut some_bool = false;
    call_something()?; 
    Ok(std::iter::from_fn(move || {
        for ... {
            for ... {
                call_something()?; // I can't bubble up errors like this
                if condition {
                    some_bool = true; // state is not preserved across calls
                }
                return Some(Ok(Segment { ... })); // it yields once, but I expect multiple yields
            }
        }
        None // No more items to yield
    }))
}

The problem is that the iterator yields only once (once it returns, it doesn't yield more). Also, I'm finding it difficult to properly bubble up errors.

This is the actual code I'm working on: pyannote-rs/segment.rs#L27

Any help would be appreciated! Thanks!

1 Like

The ability to resume a function from where it last returned is called coroutines or generators, and they're not yet in the language, at least for iterators.

The hard part of your code is that try_extract_tensor returns a borrowed iterator (ArrayViewD). This means you can't easily store them and their owner in the same Iterator.

If you absolutely need an iterator, then you can make it work with some unsafe. If you need an iterator eventually, then you could return something that impl IntoIterator for &mut Self, with some effort. But a simpler solution is to take a closure and run it internally on every item.

pub fn get_segments<P: AsRef<Path>>(
    samples: &[i16],
    sample_rate: u32,
    model_path: P,
    mut f: impl FnMut(Segment),
) -> Result<()> {
    // Create session using the provided model path
    let session = session::create_session(model_path.as_ref())?;

    // Define frame parameters
    let frame_size = 270;
    let frame_start = 721;
    let window_size = (sample_rate * 10) as usize; // 10 seconds
    let mut is_speeching = false;
    let mut offset = frame_start;
    let mut start_offset = 0.0;

    // Pad end with silence for full last segment
    let padded_samples = {
        let mut padded = Vec::from(samples);
        padded.extend(vec![0; window_size - (samples.len() % window_size)]);
        padded
    };

    let mut start_iter = (0..padded_samples.len()).step_by(window_size);
    for start in start_iter {
        let end = (start + window_size).min(padded_samples.len());
        let window = &padded_samples[start..end];

        // Convert window to ndarray::Array1
        let array = ndarray::Array1::from_iter(window.iter().map(|&x| x as f32));
        let array = array.view().insert_axis(Axis(0)).insert_axis(Axis(1));

        // Handle potential errors during the session and input processing
        let inputs = ort::inputs![array.into_dyn()]
            .map_err(|e| eyre::eyre!("Failed to prepare inputs: {:?}", e))?;

        let ort_outs = session
            .run(inputs)
            .map_err(|e| eyre::eyre!("Failed to run the session: {:?}", e))?;

        let ort_out = ort_outs.get("output").context("Output tensor not found")?;

        let ort_out = ort_out
            .try_extract_tensor::<f32>()
            .context("Failed to extract tensor")?;

        for row in ort_out.outer_iter() {
            for sub_row in row.axis_iter(Axis(0)) {
                let max_index = find_max_index(sub_row)?;

                if max_index != 0 {
                    if !is_speeching {
                        start_offset = offset as f64;
                        is_speeching = true;
                    }
                } else if is_speeching {
                    let start = start_offset / sample_rate as f64;
                    let end = offset as f64 / sample_rate as f64;

                    let start_f64 = start * (sample_rate as f64);
                    let end_f64 = end * (sample_rate as f64);

                    // Ensure indices are within bounds
                    let start_idx = start_f64.min((samples.len() - 1) as f64) as usize;
                    let end_idx = end_f64.min(samples.len() as f64) as usize;

                    let segment_samples = &padded_samples[start_idx..end_idx];

                    is_speeching = false;

                    f(Segment {
                        start,
                        end,
                        samples: segment_samples.to_vec(),
                    });
                }
                offset += frame_size;
            }
        }
    }
    Ok(())
}

This isn't as flexible as the iterator one, but you can loosen it if needed. For example, you could pass find_max_index errors to the closure to keep them separate from the other errors, and/or you could have the closure return a Result and abort early when it's Err.

6 Likes

I use an iterator because I want each segment to be available immediately, rather than waiting for the entire process to complete. I choose iter because it works out of the box.

Perhaps it's possible with the generator crate?

Also, the following works:

pub fn get_segments<P: AsRef<Path>>(
    samples: &[i16],
    sample_rate: u32,
    model_path: P,
) -> Result<impl Iterator<Item = Result<Segment>> + '_> {
    // Create session using the provided model path
    let session = session::create_session(model_path.as_ref())?;

    // Define frame parameters
    let frame_size = 270;
    let frame_start = 721;
    let window_size = (sample_rate * 10) as usize; // 10 seconds
    let mut is_speeching = false;
    let mut offset = frame_start;
    let mut start_offset = 0.0;

    // Pad end with silence for full last segment
    let padded_samples = {
        let mut padded = Vec::from(samples);
        padded.extend(vec![0; window_size - (samples.len() % window_size)]);
        padded
    };

    let mut start_iter = (0..padded_samples.len()).step_by(window_size);

    let mut segments_queue = VecDeque::new();
    Ok(std::iter::from_fn(move || {
        if let Some(start) = start_iter.next() {
            let end = (start + window_size).min(padded_samples.len());
            let window = &padded_samples[start..end];

            // Convert window to ndarray::Array1
            let array = ndarray::Array1::from_iter(window.iter().map(|&x| x as f32));
            let array = array.view().insert_axis(Axis(0)).insert_axis(Axis(1));

            // Handle potential errors during the session and input processing
            let inputs = match ort::inputs![array.into_dyn()] {
                Ok(inputs) => inputs,
                Err(e) => return Some(Err(eyre::eyre!("Failed to prepare inputs: {:?}", e))),
            };

            let ort_outs = match session.run(inputs) {
                Ok(outputs) => outputs,
                Err(e) => return Some(Err(eyre::eyre!("Failed to run the session: {:?}", e))),
            };

            let ort_out = match ort_outs.get("output").context("Output tensor not found") {
                Ok(output) => output,
                Err(e) => return Some(Err(eyre::eyre!("Output tensor error: {:?}", e))),
            };

            let ort_out = match ort_out
                .try_extract_tensor::<f32>()
                .context("Failed to extract tensor")
            {
                Ok(tensor) => tensor,
                Err(e) => return Some(Err(eyre::eyre!("Tensor extraction error: {:?}", e))),
            };

            for row in ort_out.outer_iter() {
                for sub_row in row.axis_iter(Axis(0)) {
                    let max_index = match find_max_index(sub_row) {
                        Ok(index) => index,
                        Err(e) => return Some(Err(e)),
                    };

                    if max_index != 0 {
                        if !is_speeching {
                            start_offset = offset as f64;
                            is_speeching = true;
                        }
                    } else if is_speeching {
                        let start = start_offset / sample_rate as f64;
                        let end = offset as f64 / sample_rate as f64;

                        let start_f64 = start * (sample_rate as f64);
                        let end_f64 = end * (sample_rate as f64);

                        // Ensure indices are within bounds
                        let start_idx = start_f64.min((samples.len() - 1) as f64) as usize;
                        let end_idx = end_f64.min(samples.len() as f64) as usize;

                        let segment_samples = &padded_samples[start_idx..end_idx];

                        is_speeching = false;

                        let segment = Segment {
                            start,
                            end,
                            samples: segment_samples.to_vec(),
                        };
                        segments_queue.push_back(segment);
                    }
                    offset += frame_size;
                }
            }
        }
        segments_queue.pop_front().map(Ok)
    }))
}

But I prefer to return each segment as soon as it's available, rather than waiting until after the for loops.

1 Like

I like your latest approach of using a VecDeque to store segments and pop them off in the iterator. :blush: I noticed the segments_queue is populated inside the for loops that iterate over ort_out. It got me wondering - would it be possible to yield segments even sooner, like immediately after each max_index calculation, without waiting for the loops to fully complete? :smiling_face_with_tear:

2 Likes

I think the instinct to try to make an iterator here is correct, but unfortunately it didn't work out of the box. The closure method I posted does make segments available immediately. If you are consuming it with a for-loop that doesn't break, you can easily adapt it to work as a closure instead:

for segment in get_segments(samples, sample_rate, model_path)? {
    process(segment?);
}

// becomes

get_segments(samples, sample_rate, model_path, |segment| process(segment))?;

Yep, this is one way of solving it. I looked briefly at how it works, and it creates a stack for the coroutines. You can also make stackless coroutines by bending rust's async functions a bit, but it's probably not as efficient.

You replied to the wrong post, but yeah, it could at least yield all existing segments before moving onto the next chunk:

Ok(std::iter::from_fn(move || {
    loop {
        if let Some(segment) = segments_queue.pop_front() {
            return Some(Ok(segment));
        }
        if let Some(start) = start_iter.next() {
            // all the same in here.
        } else {
            return None;
        }
        // remove the last line:
        // segments_queue.pop_front().map(Ok)
    }
}))
1 Like

from_fn() expects you to update your captured state and return each time, so a loop doesn't help, but loops are just calling into_iter() then next() a bunch, so you could try flattening the loops?

That's halfway to the traditional way to implement iterators, as a structure with fields and an impl Iterator item.

2 Likes

Unfortunately it's not so simple, because the iterators borrow from owned items in the closure. It's basically this:

pub fn iter() -> impl Iterator {
    let items = vec![1, 2, 3];
    let mut iter = None;

    std::iter::from_fn(move || {
        iter.get_or_insert_with(|| {
            items.iter() // imagine there is no into_iter
        })
        .next()
        .copied()
    })
}
2 Likes