Optimize lemerre's scan_count algorithm

Hi there,

I was going through this blog on optimizing a simple algorithm - scan count, by Daniel Lemire.

Being new to Rust, I wanted to see how close I can get to this heavily optimized code, by writing as natural rust code as I can (no low-level stuff, etc.).

I was pleasantly surprised to see that my first few attempts lead to performance that appears very close to that in above blog: I could not get exact CPU counters for the hot code (after excluding init and data creation step); but from a criterion benchmark, this could get a throughput of about 2.2 nano-seconds/element on my 2.7 GHz Intel Core i7 Macbook.

I am pasting the code snippet below (as a single main file), hoping that others can review and point further optimizations that could be done to this code snippet.

use rand::RngCore;
use std::time::Instant;

/// data: input arrays. each array is sorted.
/// threshold: an element needs to be repeat at least these many times for it to be selected
/// n: upper bound on magnitude of input elements (exclusive)
/// cache_bytes: how many cache bytes to assume - we try to bulk-modify only those many counters that
/// fit in this size
#[inline]
pub fn scan_count(data: &Vec<Vec<u32>>, threshold: u8, n: u32, cache_bytes: usize) -> Vec<u32> {
    let mut marks = vec![0; data.len()];  // record starting point for every input array
    let mut counts = vec![0u8; n as usize];  // frequency counts per element
    let mut out = Vec::with_capacity(data[0].len());  // elements that pass the threshold

    let stride = cache_bytes - 128 ; // leave some space in cache for other things.
    for lo in (0..n).step_by(stride) {
        let hi = n.min(lo + stride as u32);
        for (ai, arr) in data.iter().enumerate() {
            let mut pos = marks[ai];
            for &a in arr.iter().skip(pos) {  // start after previously marked position
                if a >= hi {  // magnitude is outside the range being processed. 
                    break;
                }
                let mut c = counts[a as usize];
                if c < threshold {  // do work only if the threshold is not already hit for this element
                    c += 1;
                    counts[a as usize] = c;
                    if c == threshold {
                        out.push(a);
                    }
                }
                pos += 1;
            }
            marks[ai] = pos;  // mark the processed position
        }
    }

    out
}

fn main() {
    let array_count = 100;
    let length = 100_000;
    let n = 20_000_000;
    let threshold = 10;

    let mut arrays:Vec<Vec<u32>> = vec![vec![]; array_count];
    let mut rng = rand::thread_rng();
    for a in arrays.iter_mut() {
        for _ in 0..length {
            a.push(rng.next_u32() % n);
        }
        a.sort_unstable();
    }

    for _ in 0..100 {
            let now = Instant::now();
            scan_count(&arrays, threshold, n, 1 << 15);
            let elapsed = now.elapsed().as_nanos() as f64;
            println!("time_taken: {:?} ns/element: {:?}", elapsed, elapsed / (array_count * length) as f64)
        }
}