[Solved] Polymorphic IO: file (memmap) or stdin - read overlapping chunks

Can someone help changing the signature of from_iter()?
Instead an iterator over Bytes I would need an iterator over slices (memory pages).
Is there a way to get overlapping slices? I have to read a little bit ahead without consuming.
Thank you in advance.

pub fn from_file(file: &str) -> Result<Count, Box<Error>> {
    let file = try!(Mmap::open_path(file, Protection::Read));
    let bytes = unsafe { file.as_slice() };
    from_iter(bytes.iter().map(|b| Ok(*b)))
}

pub fn from_stdin() -> Result<Count, Box<Error>> {
    let stdin = stdin();
    let stdin = stdin.lock();
    let stdin = BufReader::new(stdin);
    let stdin = stdin.bytes();
    from_iter(stdin)
}

fn from_iter<I>(bytes: I) -> Result<Count, Box<Error>>
    where I: Iterator<Item=io::Result<u8>>
{

    for c in bytes { ...}
}

You're looking for [T]::windows(size: usize) or [T]::chunks(size: usize). However, these methods can only operate on slices (which you appear to have here).

1 Like

Thank you! It helped to write from_file(). A 7 Bytes window advancing 5 Bytes.

from_stdin() is more tricky: I am trying to build an iterator with fill_buf() and consume(), but I am stuck at TODO. Any ideas?

pub fn from_file(file: &str) -> Result<Count, Box<Error>> {
    let file = try!(Mmap::open_path(file, Protection::Read));
    let bytes = unsafe { file.as_slice() };
    from_iter(bytes.windows(7).skip(5));
}

pub fn from_stdin() -> Result<Count, Box<Error>> {
    let stdin = stdin();
    let stdin = stdin.lock();
    //TODO
    from_iter(stdin)
}


    fn from_iter<'a,I>(chunks: I) -> Result<Count, Box<Error>>
        where I: Iterator<Item=&'a[u8]>
    {
        for bytes in chunks {
            for c in bytes {
                  //....
            }
    }
pub fn from_stdin() -> Result<(), Box<Error>> {
    let mut stdin = stdin().lock();
    let mut bytes = Vec::new();
    try!(stdin.read_to_end(&mut bytes));
    from_iter(bytes.windows(7).skip(5))
}

If you don't want to read all of stdin before you start your computation, you'll probably have to implement some sort of circular buffer yourself.

Thank you a lot!
Yes I need to compute slice by slice because typical files and streams are bigger then RAM. This is why memmap had been chosen for files.
I hope by replacing read_to_end with read it will be possible to pass the &mut bytes as the result of next() implementing an iterator.
The overlapping iterator part can then (hopefulle) implemented using by_ref.

@eefriedman : I tried out your code below, but I have lifetime issues. Can you help?

pub fn from_file(file: &str) -> Result<Self, Box<Error>> {
        let file = try!(Mmap::open_path(file, Protection::Read));
        let bytes = unsafe { file.as_slice() };
        Self::from_iter(bytes.windows(7).skip(5))
}

pub fn from_stdin() -> Result<Self, Box<Error>> {
        let mut stdin = stdin().lock();
        let mut bytes = Vec::new();
        try!(stdin.read_to_end(&mut bytes));
        Self::from_iter(bytes.windows(7).skip(5))
}
    fn from_iter<'a,I>(chunks: I) -> Result<Count, Box<Error>>
        where I: Iterator<Item=&'a[u8]>
    {     
        for bytes in chunks {
        for c_byte in bytes {

I get the following error message:

count.rs:96:25: 96:32 error: borrowed value does not live long enough
count.rs:96         let mut stdin = stdin().lock();
                                    ^~~~~~~
count.rs:95:53: 100:2 note: reference must be valid for the block at 95:52...
count.rs:95     pub fn from_stdin() -> Result<Self, Box<Error>> {
                                                                ^
count.rs:96:9: 96:40 note: ...but borrowed value is only valid for the statement at 96:8
count.rs:96         let mut stdin = stdin().lock();
                    ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
count.rs:96:9: 96:40 help: consider using a `let` binding to increase its lifetime
count.rs:96         let mut stdin = stdin().lock();
                    ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

I do not understand: there is a let binding?

Erk, that's what I get for writing code without actually checking it compiles. Try instead:

let stdin = stdin();
let mut stdin = stdin.lock();

That works! Thank you!

I noticed that windows(7).skip(2) doesn't produce overlapping chunks. It actually needs a filter:

windows(7).enumerate().filter(|&x|x.0 % 5==0).map(|(_,v)|v))

However, the whole line looks awkward to me! Does it compile into something efficient or is it worth to set up an own windows(7) like iterator, e.g. windows(7,2)?

The remaining issue is that from_stdin reads the whole input into a vector before iterating. Can you give me a hint how I can avoid this? My computation only needs a running (overlapping) window over the stream. I can not afford holding the whole input stream in memory because it is too big.

Here an extract of the working code.

    pub fn from_file(file: &str) -> Result<Self, Box<Error>> {
        let file = try!(Mmap::open_path(file, Protection::Read));
        let bytes = unsafe { file.as_slice() };
        Self::from_iter(bytes.windows(7).enumerate().filter(|&x|x.0%5==0).map(|(_,v)|v))
}

    pub fn from_stdin() -> Result<Self, Box<Error>> {
        let stdin = stdin();
        let mut stdin = stdin.lock();
        let mut bytes = Vec::new();
        try!(stdin.read_to_end(&mut bytes));
        Self::from_iter(bytes.windows(7).enumerate().filter(|&x|x.0%5==0).map(|(_,v)|v))
}

    fn from_iter<'a,I>(chunks: I) -> Result<Count, Box<Error>>
        where I: Iterator<Item=&'a[u8]>
    {
        for bytes in chunks {
            for c_byte in bytes { ...
          

Looks like you end up with a nested loop with a urem for that code... so probably not what you want.

In terms of avoiding reading the whole file, you need a buffer. BufReader doesn't really expose the right functionality, so you'll need to implement something yourself. Rough outline (not really tested, maybe not quite what you want):

pub fn from_stdin(handle_chunk: fn(&[u8])) -> Result<(), Box<Error>> {
    let stdin = stdin();
    let mut stdin = stdin.lock();
    let mut x = [0; 4096];
    let mut data_start = 0;
    let mut data_end = 0;
    let mut done = false;
    while !done {
        // Rotate the buffer if there isn't enough space
        if x.len() - data_start < 7 {
            let (a, b) = x.split_at_mut(data_start);
            let len = data_start - data_end;
            a[..len].copy_from_slice(&b[..len]);
            data_start = 0;
            data_end = data_start + len;
        }
        // Read from stdin
        while data_end - data_start < 7 {
            let bytes = try!(stdin.read(&mut x[data_end..]));
            data_end += bytes;
            if bytes == 0 {
                done = true;
                break;
            }
        }
        // Handle data.
        while data_end - data_start >= 7 {
            handle_chunk(&x[data_start..data_start + 7]);
            data_start += 1;
        }
    }
    Ok(())
}
1 Like

This is very close to what I need and I learn a lot from you code. Thank you so much!

I am implementing a tool that searches for Unicode patterns in large files and streams. The chunks are given to threads to search in parallel and sync their findings. As Unicode code points can spread over multiple Bytes I need these overlapping chunks. With your code they can be realized easily by adapting the second last line to: data_start += 5; In my code the actual chunk size will be some memory pages (instead of 5 Bytes in this sample code), and the overlapping part will be 10 Byes (instead of the 7 Bytes - 5 Bytes = 2 Bytes here).

I was wondering if there is a way to avoid coping in line a[..len].copy_from_slice(&b[..len]); Seeing that I need these overlapping chunks there is no way out of it. At least the overlapping part has to be copied from the end to the beginning. As it is only 10 Bytes this is no problem.

I wonder how smart the iterator laziness actually is. Most of the generated windows(7) are not used in the following line because they are filtered out:
.windows(7).enumerate().filter(|&x|x.0 % 5==0).map(|(_,v)|v))
Are the useless windows generated anyway? How big is the cost?

I will be on a trip for 2 days. As soon I have find the time to test your code I will give you feedback. Thanks again.

.enumerate().filter(|&x|x.0 % 5==0).map(|(_,v)|v)) is relatively expensive because it computes x.0 % 5 a bunch of times. Maybe use itertools instead? ( https://crates.io/crates/itertools ).

1 Like

Here the working code:

use std::io::prelude::*;
use std::io::{stdin};
use std::error::Error;
use std::str;
extern crate memmap;
use self::memmap::{Mmap, Protection};
extern crate itertools;
use self::itertools::Itertools;

const WIN_STEP: usize = 60;
const WIN_LEN: usize = WIN_STEP + 10;
// must be >= WIN_LEN, the bigger the better performace (less copy_from_slice)
const BUF_LEN: usize = 16 * WIN_STEP; 

pub fn from_file(file: &str, handle_chunk: fn(&[u8])) -> Result<(), Box<Error>> {
    let file = try!(Mmap::open_path(file, Protection::Read));
    let bytes = unsafe { file.as_slice() };
    let len = bytes.len();
    for chunk in bytes.windows(WIN_LEN).step(WIN_STEP) {
        handle_chunk(&chunk);
    }
    // The last is usually shorter
    let rest = len%WIN_STEP;
    if rest != 0 {
        handle_chunk(&bytes[len - rest ..]);
    }
    Ok(())
}

pub fn from_stdin(handle_chunk: fn(&[u8])) -> Result<(), Box<Error>> {
    let stdin = stdin();
    let mut stdin = stdin.lock();
    let mut buf = [0; BUF_LEN];
    let mut data_start: usize = 0;
    let mut data_end: usize = 0;
    let mut done = false;
    while !done {
        // Rotate the buffer if there isn't enough space
        if data_start + WIN_LEN > BUF_LEN {
            let (a, b) = buf.split_at_mut(data_start);
            let len = data_end - data_start;
            a[..len].copy_from_slice(&b[..len]);
            data_start = 0;
            data_end = len;
        }
        // Read from stdin
        while data_end < data_start + WIN_LEN  {
            let bytes = try!(stdin.read(&mut buf[data_end..]));
            if bytes == 0 {
                done = true;
                break;
            }
            else {data_end += bytes; }
        }
        // Handle data.
        while data_start + WIN_LEN <= data_end {
            handle_chunk(&buf[data_start..data_start + WIN_LEN]);
            data_start += WIN_STEP;
        }
    }
    // The last is usually shorter
    if data_start < data_end {
        handle_chunk(&buf[data_start..data_end]);
    }
    Ok(())
}

fn work_with_data(data: &[u8]) {
    let s = str::from_utf8(data).unwrap();
    println!("New chunk {}:\t{}",s.len(),s);
}


fn main() {
    from_stdin(work_with_data).expect("Stdin error!");
    from_file("scanme.txt", work_with_data).unwrap();
}

I think the code by @reu for stdin is useful on its own, it would be nice to have it in a library if it can be reasonably generalized

Thank you all for your help. Here a link to the first release of the product I was implementing:

Github: getreu/stringsext
A Unicode enhancement of the GNU strings-tool with additional features.