How to traverse two memory maps simultaneously in parallel like rayon par_chunks

Hi there, I've implemented a file copier that attempts to:
-memory map the source file
-memory map the destination file
-FIRST VERSION SequentialBlockCopier() copy the blocks in sequential from source map to destination map. IT WORKS.
-SECOND VERSION ParallelBlockCopier() copy the blocks in parallel from source map to destination map.
Compilation error: *mut u8 cannot be sent between threads safely.
My poor head can't solve this. I'm sorry. Could anybody help with this?
Thank you in advance for any advice and suggestions. Cheers.

-copy the quotient remainder bytes from source map to destination map. IT WORKS.
Done.

I've attached the source below for the project called dtmfcopy. DTMF = dedicated to my family.

cat Cargo.toml 
[package]
name = "dtmfcopy"
version = "0.1.0"
authors = ["David Marceau <uticdmarceau2007@yahoo.ca>"]
edition = "2018"

[dependencies]
libc = "0.2.77"
flume = "0.9.1"
threadpool = "1.8.1"

[profile.release]
lto = 'thin'
panic = 'abort'
codegen-units = 1

cat src/main.rs
//dtmfcopy
//dtmf stands for "dedicated to my family"
//Oct 2020
// This program is heavily inspired by b3sum for being so good at reading a huge file so quickly.

use std::env;
use libc;
use std::{
    fs::File,
    fs::OpenOptions,
    io::{Seek, SeekFrom, Write},
    os::unix::prelude::AsRawFd,
    ptr,
};
use threadpool::ThreadPool;

#[derive(Debug,Clone)]
pub struct BlockCopyConf {
    pub TakeCPUCores : i32,
    pub Block_size : i64,
    pub Blocks_per_chunk : i64,
    pub Chunk_size : i64,
    pub QuotientChunks : i64,
    pub NumberOfBlocksToCopy : i64,
    pub ModuloRemainderBytes : i64,
    pub SourceFileMap : *mut libc::c_void,
    pub DestinationFileMap : *mut libc::c_void,
    pub Src_offset : *mut u8,
    pub Dst_offset : *mut u8,
}

fn main() {

    let mut srcFilePath : String = "".to_string();
    let mut dstFilePath : String = "".to_string();
    let mut arg_counter = 0;
    for argument in env::args() {
        match arg_counter {
            1 => {
                srcFilePath = argument.clone();
                println!("srcFilePath:<<{}>>", srcFilePath);
            },
            2 => {
                dstFilePath = argument.clone();
                println!("dstFilePath:<<{}>>", dstFilePath);
            },
            _ => {
            }
        }
        arg_counter = arg_counter + 1;
    }

    let mut myBlockCopyConf = BlockCopyConf {
        TakeCPUCores : 4,
        Block_size : 4096,
        Blocks_per_chunk : 1024,
        Chunk_size : 4096 * 1024,
        QuotientChunks : 0,
        NumberOfBlocksToCopy : 0,
        ModuloRemainderBytes : 0,
        SourceFileMap : 0 as *mut libc::c_void,
        DestinationFileMap : 0 as *mut libc::c_void,
        Src_offset : 0 as *mut u8,
        Dst_offset : 0 as *mut u8,        
    };

    let mut theModuloChunkCounter : i64 = 999;
    let mut nChunkCounter : i64 = 0;
    //let mut nDeterminedNumberOfChunks : i64 = 0;    
    
    let mut srcFileOpenOptions : &mut std::fs::OpenOptions;
    let mut srcFile : File;
    let mut srcMetadata : std::fs::Metadata;
    let mut srcFile_size : u64;
    let mut srcFileResult : std::result::Result<File, std::io::Error>;

    let mut srcFileOpenOptions = OpenOptions::new();
    srcFileOpenOptions.read(true).write(true);
    srcFileResult = srcFileOpenOptions.open(srcFilePath);
    if( !srcFileResult.is_ok() ) {
        return;
    } else {
        srcFile = srcFileResult.unwrap();
        srcMetadata = srcFile.metadata().unwrap();
        if !srcMetadata.is_file() {
            return;
        }
        srcFile_size = srcMetadata.len();
    }

    unsafe {
          myBlockCopyConf.SourceFileMap = libc::mmap(
              /* addr: */ ptr::null_mut(),
              /* len: */ srcFile_size as usize,
              /* prot: */ libc::PROT_READ | libc::PROT_WRITE,
              // Then make the mapping *public* so it is written back to the file
              /* flags: */ libc::MAP_SHARED,
              /* fd: */ srcFile.as_raw_fd(),
              /* offset: */ 0,
          );
    }

    if myBlockCopyConf.SourceFileMap == libc::MAP_FAILED {
        return;
    }

    let mut dstFileOpenOptions : &mut std::fs::OpenOptions;
    let mut dstFile : File;    
    let mut dstMetadata : std::fs::Metadata;
    let mut dstFile_size : u64;
    let mut dstFileResult : std::result::Result<File, std::io::Error>;

    let mut dstFileOpenOptions = OpenOptions::new();
    dstFileOpenOptions.read(true).write(true).create(true);
    dstFileResult = dstFileOpenOptions.open(dstFilePath);
    if( !dstFileResult.is_ok() ) {
        return;        
    } else {
        dstFile = dstFileResult.unwrap();
    }

    // Reserve filesize capacity beforehand
    let dstFile_size = srcFile_size;
    dstFile.seek(SeekFrom::Start((dstFile_size - 1) as u64)).unwrap();
    dstFile.write_all(&[0]).unwrap();
    dstFile.seek(SeekFrom::Start(0)).unwrap();

    unsafe {
         myBlockCopyConf.DestinationFileMap = libc::mmap(
             /* addr: */ ptr::null_mut(),
             /* len: */ dstFile_size as usize,
             /* prot: */ libc::PROT_READ | libc::PROT_WRITE,
             // Then make the mapping *public* so it is written back to the file
             /* flags: */ libc::MAP_SHARED,
             /* fd: */ dstFile.as_raw_fd(),
             /* offset: */ 0,
         );
    }
    
    if myBlockCopyConf.DestinationFileMap == libc::MAP_FAILED {
         return;
    }

    myBlockCopyConf.QuotientChunks = (srcFile_size as i64)  / (myBlockCopyConf.Chunk_size as i64);
    myBlockCopyConf.ModuloRemainderBytes = (srcFile_size as i64)  % (myBlockCopyConf.Chunk_size as i64);

    myBlockCopyConf.Src_offset = myBlockCopyConf.SourceFileMap as *mut u8; // as *mut u8 as usize;
    myBlockCopyConf.Dst_offset = myBlockCopyConf.DestinationFileMap as *mut u8; // as *mut u8 as usize;
    myBlockCopyConf.NumberOfBlocksToCopy = myBlockCopyConf.Blocks_per_chunk * myBlockCopyConf.QuotientChunks;

    SequentialBlockCopier(&mut myBlockCopyConf);
    //ParallelBlockCopier(&mut myBlockCopyConf);

    if (myBlockCopyConf.ModuloRemainderBytes > 0) {
        println!("ModuloRemainderBytes:<<{}>>",myBlockCopyConf.ModuloRemainderBytes);

        let mut myBlock_Src_offset = myBlockCopyConf.SourceFileMap as *mut u8; // as *mut u8 as usize;
        let mut myBlock_Dst_offset = myBlockCopyConf.DestinationFileMap as *mut u8; // as *mut u8 as usize;

        myBlock_Src_offset = (myBlock_Src_offset as usize + (myBlockCopyConf.NumberOfBlocksToCopy * myBlockCopyConf.Block_size) as usize ) as *mut u8;
        myBlock_Dst_offset = (myBlock_Dst_offset as usize + (myBlockCopyConf.NumberOfBlocksToCopy * myBlockCopyConf.Block_size) as usize ) as *mut u8;        
        
        unsafe {
            ptr::copy_nonoverlapping(myBlock_Src_offset as *mut u8, myBlock_Dst_offset as *mut u8, myBlockCopyConf.ModuloRemainderBytes as usize);
        }        
    } else {
        //println!("No remainder bytes.");
    }

    println!("\nsrcFile_size:<<{}>>",srcFile_size);
    println!("Done");
}

pub fn SequentialBlockCopier(theBlockCopyConf_ : &mut BlockCopyConf) {
    for blockCounter in 0..theBlockCopyConf_.NumberOfBlocksToCopy {
        unsafe {
            ptr::copy_nonoverlapping(theBlockCopyConf_.Src_offset as *mut u8, theBlockCopyConf_.Dst_offset as *mut u8, theBlockCopyConf_.Block_size as usize);
        }

         theBlockCopyConf_.Src_offset = (theBlockCopyConf_.Src_offset as usize + theBlockCopyConf_.Block_size as usize) as *mut u8;
         theBlockCopyConf_.Dst_offset = (theBlockCopyConf_.Dst_offset as usize + theBlockCopyConf_.Block_size as usize) as *mut u8;

         if (blockCounter % theBlockCopyConf_.Blocks_per_chunk == 0) {
             print!("."); // displays a . progress for a chunk
             std::io::stdout().flush();
         }        
    }
}

// pub fn ParallelBlockCopier(theBlockCopyConf_ : &mut BlockCopyConf) {
//     let (tx, rx) = flume::unbounded();

//     let mut mySourceMapStart = theBlockCopyConf_.SourceFileMap as *mut u8; // as *mut u8 as usize;
//     let mut myDestMapStart = theBlockCopyConf_.DestinationFileMap as *mut u8; // as *mut u8 as usize;
//     let mut myBlockSize = theBlockCopyConf_.Block_size;
        
//         let mut pool = ThreadPool::new(theBlockCopyConf_.TakeCPUCores as usize); //pool count to cpu cores you want to take
//         for nBlockCopyLaunchRequestCounter in 0..theBlockCopyConf_.NumberOfBlocksToCopy { //upper bound is exclusive            
//             let tx2 = tx.clone();
//             unsafe {
//                 pool.execute(move || {
//                     //do the block copy here
//                     //println!("nBlockCopyLaunchRequestCounter:<<{}>>", nBlockCopyLaunchRequestCounter);
                    
//                     let mut myBlock_Src_offset = mySourceMapStart;
//                     let mut myBlock_Dst_offset = myDestMapStart;
                    
//                     myBlock_Src_offset = (myBlock_Src_offset as usize + (nBlockCopyLaunchRequestCounter * myBlockSize ) as usize ) as *mut u8;
//                     myBlock_Dst_offset = (myBlock_Dst_offset as usize + (nBlockCopyLaunchRequestCounter * myBlockSize ) as usize ) as *mut u8;
                    
//                     unsafe {
//                         ptr::copy_nonoverlapping(myBlock_Src_offset as *mut u8, myBlock_Dst_offset as *mut u8, myBlockSize as usize);
//                     }
                    
//                     tx2.send(nBlockCopyLaunchRequestCounter);
//                 });            
//             }
//         }
    
//         drop(tx); // We are done with our sending half so we can explicitly drop it here.

//         for theReturnedBlockCopyIndex in rx {                    
//             //println!("theReturnedBlockCopyIndex:<<{}>>",theReturnedBlockCopyIndex);
//         }
// }
// error[E0277]: `*mut u8` cannot be sent between threads safely
//    --> src/main.rs:207:22
//     |
// 207 |                   pool.execute(move || {
//     |  ______________________^^^^^^^_-
//     | |                      |
//     | |                      `*mut u8` cannot be sent between threads safely
// 208 | |                     //do the block copy here
// 209 | |                     //println!("nBlockCopyLaunchRequestCounter:<<{}>>", nBlockCopyLaunchRequestCounter);
// 210 | |                     
// ...   |
// 221 | |                     tx2.send(nBlockCopyLaunchRequestCounter);
// 222 | |                 });            
//     | |_________________- within this `[closure@src/main.rs:207:30: 222:18]`
//     |
//     = help: within `[closure@src/main.rs:207:30: 222:18]`, the trait `Send` is not implemented for `*mut u8`
//     = note: required because it appears within the type `[closure@src/main.rs:207:30: 222:18]`
[david@eastwoodarch dtmfcopy]$

I would recommend using https://crates.io/crates/memmap to make the memmaps and https://docs.rs/memmap/0.7.0/memmap/struct.MmapMut.html#method.split_at_mut to cut up the &mut [u8] from MmapMut can provide. Then the only unsafe is the initial memory map (it's unsafe since a separate process could modify the memory as you're reading which is a data race).

As an alternatively you could turn your ptr into a slice with https://doc.rust-lang.org/std/slice/fn.from_raw_parts_mut.html then use split_at_mut to break it up in parts for each thread.

Any type containing a *mut is !Send and !Sync since it's super easy to write a data race. You can tell Rust to trust you (and it's very easy to write very nasty bugs if you messup) that it won't happen with unsafe impl Send for Type and unsafe impl Sync for Type

Thank you for your recommendation.
I rewrote it with memmap with first round doing a sequential copy.
That was clear and straightforward.

However my two following rounds of tweaking the parallel copy, I'm finding what I wrote seemed straightforward and in my humble opinion should have compiled. It's a real painpoint trying to understand why I have to re-work the code when how I expressed feels the most intuitive to me.

Here's the question in crystallized form:
How do I traverse two memory maps simultaneously in parallel like rayon's par_chunks where the source chunk position and destination chunk position are the same but in the two different memory maps source map and destination map and the destination map receives the its source chunk at its original source location, but done in a manner that is in parallel and delegated to a pool with count as many as existing cores in cpu?

I have been envisioning a workaround using a array of channels of size cpucorecount and sending a block in each core's respective channel, with it's respective block position. The receiving channel would position and write its respective block in the destination memory map. It's another way, but it would slow down what the above could do in an elegant fashion if only the compiler would accept it.

Anyways, I have a attached the reworked code. Thanks in advance.

cat Cargo.toml 
[package]
name = "dtmfcopy2"
version = "0.1.0"
authors = ["David Marceau <uticdmarceau2007@yahoo.ca>"]
edition = "2018"

[dependencies]
#libc = "0.2.77"
flume = "0.9.1"
threadpool = "1.8.1"
memmap = "0.7.0"

[profile.release]
lto = 'thin'
panic = 'abort'
codegen-units = 1


cat src/main.rs 
use std::env;
use std::{
    fs::File,
    fs::OpenOptions,
    io::{Seek, SeekFrom, Write},
    os::unix::prelude::AsRawFd,
    ptr,
};
use threadpool::ThreadPool;

use memmap::Mmap;
use memmap::MmapMut;
use std::ops::DerefMut;

#[derive(Debug)]
pub struct BlockCopyConf2 {
    pub TakeCPUCores : i32,
    pub Block_size : i64,
    pub Blocks_per_chunk : i64,
    pub Chunk_size : i64,
    pub QuotientChunks : i64,
    pub NumberOfBlocksToCopy : i64,
    pub ModuloRemainderBytes : i64,
    pub SourceFileMap : memmap::MmapMut,
    pub DestinationFileMap : memmap::MmapMut,
}

fn main() {
    let mut srcFilePath : String = "".to_string();
    let mut dstFilePath : String = "".to_string();
    let mut arg_counter = 0;
    for argument in env::args() {
        match arg_counter {
            1 => {
                srcFilePath = argument.clone();
                println!("srcFilePath:<<{}>>", srcFilePath);
            },
            2 => {
                dstFilePath = argument.clone();
                println!("dstFilePath:<<{}>>", dstFilePath);
            },
            _ => {
            }
        }
        arg_counter = arg_counter + 1;
    }
  
    let mut srcFileOpenOptions : &mut std::fs::OpenOptions;
    let mut srcFile : File;
    let mut srcMetadata : std::fs::Metadata;
    let mut srcFile_size : u64;
    let mut srcFileResult : std::result::Result<File, std::io::Error>;

    let mut srcFileOpenOptions = OpenOptions::new();
    srcFileOpenOptions.read(true).write(true);
    srcFileResult = srcFileOpenOptions.open(srcFilePath);
    if( !srcFileResult.is_ok() ) {
        return;
    } else {
        srcFile = srcFileResult.unwrap();
        srcMetadata = srcFile.metadata().unwrap();
        if !srcMetadata.is_file() {
            return;
        }
        srcFile_size = srcMetadata.len();
    }

    let mut mySrcMmapResult : std::result::Result<Mmap, std::io::Error>;
    let mut src_mut_mmap : memmap::MmapMut;
    unsafe { 
        mySrcMmapResult = Mmap::map(&srcFile);
        match mySrcMmapResult {
            Ok(theGoodMmap) => {
                src_mut_mmap = theGoodMmap.make_mut().unwrap();
            },
            Err(e) => {
                return;
            },
        }
    }

    let mut dstFileOpenOptions : &mut std::fs::OpenOptions;
    let mut dstFile : File;    
    let mut dstMetadata : std::fs::Metadata;
    let mut dstFile_size : u64;
    let mut dstFileResult : std::result::Result<File, std::io::Error>;

    let mut dstFileOpenOptions = OpenOptions::new();
    dstFileOpenOptions.read(true).write(true).create(true);
    dstFileResult = dstFileOpenOptions.open(dstFilePath);
    if( !dstFileResult.is_ok() ) {
        return;        
    } else {
        dstFile = dstFileResult.unwrap();
    }

    // Reserve filesize capacity beforehand
    let dstFile_size = srcFile_size;
    dstFile.seek(SeekFrom::Start((dstFile_size - 1) as u64)).unwrap();
    dstFile.write_all(&[0]).unwrap();
    dstFile.seek(SeekFrom::Start(0)).unwrap();
    
    let mut myDstMmapResult : std::result::Result<Mmap, std::io::Error>;
    let mut dst_mut_mmap : memmap::MmapMut;
    unsafe { 
        myDstMmapResult = Mmap::map(&dstFile);
        match myDstMmapResult {
            Ok(theGoodMmap) => {
                dst_mut_mmap = theGoodMmap.make_mut().unwrap();
            },
            Err(e) => {
                return;
            },
        }
    }
    
    let myBlock_size : i64 = 4096;
    let myBlocks_per_chunk : i64 = 1024;
    let myChunk_size: i64 = 4096 * 1024;
    let myQuotientChunks : i64 = (srcFile_size as i64)  / (myChunk_size as i64);
    let myNumberOfBlocksToCopy : i64 = myBlocks_per_chunk * myQuotientChunks;
    let myModuloRemainderBytes : i64 = (srcFile_size as i64)  % (myChunk_size as i64);

    let mut myBlockCopyConf2 = BlockCopyConf2 {
        TakeCPUCores : 4,
        Block_size : myBlock_size,
        Blocks_per_chunk : myBlocks_per_chunk,
        Chunk_size : myChunk_size,
        QuotientChunks : myQuotientChunks,
        NumberOfBlocksToCopy : myNumberOfBlocksToCopy,
        ModuloRemainderBytes : myModuloRemainderBytes,
        SourceFileMap : src_mut_mmap,
        DestinationFileMap : dst_mut_mmap,
    };


    SequentialBlockCopier(&mut myBlockCopyConf2);
    //ParallelBlockCopier(&mut myBlockCopyConf2);
    //ParallelBlockCopierV2(&mut myBlockCopyConf2);

    if (myBlockCopyConf2.ModuloRemainderBytes > 0) {
        println!("ModuloRemainderBytes:<<{}>>",myBlockCopyConf2.ModuloRemainderBytes);

        let src_range_start = myBlockCopyConf2.NumberOfBlocksToCopy * myBlockCopyConf2.Block_size;
        let src_range_end = src_range_start  + myBlockCopyConf2.ModuloRemainderBytes;

        (&mut myBlockCopyConf2.DestinationFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]).write_all(&(myBlockCopyConf2.SourceFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]));
    } else {
         println!("No remainder bytes.");
    }

    println!("\nsrcFile_size:<<{}>>",srcFile_size);
    println!("Done");
}

pub fn SequentialBlockCopier(theBlockCopyConf2_ : &mut BlockCopyConf2) {
    //&mmap[..51]
    //&mmap[0..8]
    // assert_eq!((3..5), std::ops::Range { start: 3, end: 5 });
    // A (half-open) range bounded inclusively below and exclusively above (start..end).
    // The Range start..end contains all values with x >= start and x < end. It is empty unless start < end.
    // let arr = [0, 1, 2, 3, 4];
    // assert_eq!(arr[ ..  ], [0,1,2,3,4]);
    // assert_eq!(arr[ .. 3], [0,1,2    ]);
    // assert_eq!(arr[ ..=3], [0,1,2,3  ]); //inclusive end range
    // assert_eq!(arr[1..  ], [  1,2,3,4]);
    // assert_eq!(arr[1.. 3], [  1,2    ]);  // Range
    // assert_eq!(arr[1..=3], [  1,2,3  ]); //inclusive end range
    
    let mut src_range_start : i64 = 0;
    let mut src_range_end : i64 = theBlockCopyConf2_.Block_size;
    
    for blockCounter in 0..theBlockCopyConf2_.NumberOfBlocksToCopy {
        (&mut theBlockCopyConf2_.DestinationFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]).write_all(&(theBlockCopyConf2_.SourceFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]));
        src_range_start = src_range_start + theBlockCopyConf2_.Block_size;
        src_range_end = src_range_end  + theBlockCopyConf2_.Block_size;
        if (blockCounter % theBlockCopyConf2_.Blocks_per_chunk == 0) {
            print!("."); // displays a . progress for a chunk
            std::io::stdout().flush();
        }        
    }
}

// pub fn ParallelBlockCopier(theBlockCopyConf2_ : &mut BlockCopyConf2) {

// // User Cocalus answered :)
// // https://docs.rs/memmap/0.7.0/memmap/struct.MmapMut.html#method.split_at_mut
// // to cut up the &mut [u8] from MmapMut can provide.
// // then use split_at_mut to break it up in parts for each thread.
// // for the parallel stuff use this
// // split_at_mut(&mut self, mid: usize) -> (&mut [T], &mut [T])
// // Divides one mutable slice into two at an index.
// // The first will contain all indices from [0, mid)(excluding the index mid itself)
// // and the second will contain all indices from [mid, len) (excluding the index len itself).
// // let (left, right) = v.split_at_mut(2);

//     let (tx, rx) = flume::unbounded();

//     let remainder_range_start = theBlockCopyConf2_.NumberOfBlocksToCopy * theBlockCopyConf2_.Block_size;    
//     let mut pool = ThreadPool::new(theBlockCopyConf2_.TakeCPUCores as usize); //pool count to cpu cores you want to take

//     for nBlockCopyLaunchRequestCounter in 0..theBlockCopyConf2_.NumberOfBlocksToCopy { //upper bound is exclusive            
//         let tx2 = tx.clone();       
        
//         pool.execute(move || {
//             let mut src_range_start : i64 = theBlockCopyConf2_.Block_size * nBlockCopyLaunchRequestCounter ;
//             let mut src_range_end : i64 = src_range_start + theBlockCopyConf2_.Block_size;

//             (&mut theBlockCopyConf2_.DestinationFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]).write_all(&(theBlockCopyConf2_.SourceFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]));
//             if (nBlockCopyLaunchRequestCounter % theBlockCopyConf2_.Blocks_per_chunk == 0) {
//                 print!("."); // displays a . progress for a chunk
//                 std::io::stdout().flush();
//             }           
            
//             tx2.send(nBlockCopyLaunchRequestCounter);
//         });
//     }    
    
//     drop(tx); // We are done with our sending half so we can explicitly drop it here.
    
//     for theReturnedBlockCopyIndex in rx {                    
//         //println!("theReturnedBlockCopyIndex:<<{}>>",theReturnedBlockCopyIndex);
//     }
// }

pub fn ParallelBlockCopierV2(theBlockCopyConf2_ : &'static mut BlockCopyConf2) {

// User Cocalus answered :)
// https://docs.rs/memmap/0.7.0/memmap/struct.MmapMut.html#method.split_at_mut
// to cut up the &mut [u8] from MmapMut can provide.
// then use split_at_mut to break it up in parts for each thread.
// for the parallel stuff use this
// split_at_mut(&mut self, mid: usize) -> (&mut [T], &mut [T])
// Divides one mutable slice into two at an index.
// The first will contain all indices from [0, mid)(excluding the index mid itself)
// and the second will contain all indices from [mid, len) (excluding the index len itself).
// let (left, right) = v.split_at_mut(2);

    let (tx, rx) = flume::unbounded();

    let remainder_range_start = theBlockCopyConf2_.NumberOfBlocksToCopy * theBlockCopyConf2_.Block_size;    
    let mut pool = ThreadPool::new(theBlockCopyConf2_.TakeCPUCores as usize); //pool count to cpu cores you want to take

    for nBlockCopyLaunchRequestCounter in 0..theBlockCopyConf2_.NumberOfBlocksToCopy { //upper bound is exclusive            
        let tx2 = tx.clone();       
        let mut src_range_start : i64 = theBlockCopyConf2_.Block_size * nBlockCopyLaunchRequestCounter ;
        let mut src_range_end : i64 = src_range_start + theBlockCopyConf2_.Block_size;
        let (src_left, src_right) = theBlockCopyConf2_.SourceFileMap.split_at_mut(src_range_end as usize);
        let (dst_left, dst_right) = theBlockCopyConf2_.DestinationFileMap.split_at_mut(src_range_end as usize);

        let theBlocksPerChunk = theBlockCopyConf2_.Blocks_per_chunk;
        
        pool.execute(move || {
             
            (&mut dst_left[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]).write_all(&(src_left[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]));

            //this one works for sequential...
            //(&mut theBlockCopyConf2_.DestinationFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]).write_all(&(theBlockCopyConf2_.SourceFileMap[std::ops::Range{ start: src_range_start as usize, end: src_range_end as usize }]));
            if (nBlockCopyLaunchRequestCounter % theBlocksPerChunk == 0) {
                print!("."); // displays a . progress for a chunk
                std::io::stdout().flush();
            }           
            
            tx2.send(nBlockCopyLaunchRequestCounter);
        });
    }    
    
    drop(tx); // We are done with our sending half so we can explicitly drop it here.
    
    for theReturnedBlockCopyIndex in rx {                    
        //println!("theReturnedBlockCopyIndex:<<{}>>",theReturnedBlockCopyIndex);
    }
}

You could unsafe impl Send / Sync, it's just it's really easy to mess up (especially multithreaded).

I can't tell if your avoiding Rayon for learning. But this is fairly easy to do in Rayon.

use std::{fs::File, fs::OpenOptions, path::PathBuf};
use structopt::StructOpt;
use anyhow::Result;
use rayon::prelude::*;

#[derive(Debug, StructOpt)]
struct Opt {
    #[structopt(parse(from_os_str))]
    source: PathBuf,

    #[structopt(parse(from_os_str))]
    dest: PathBuf,

    #[structopt(default_value = "1048576")]
    chunk_size: usize,
}

fn main() -> Result<()> {
    let args = Opt::from_args();

    let source = File::open(&args.source)?;
    let size = source.metadata()?.len();
    let dest = OpenOptions::new().read(true).write(true).create(true).open(&args.dest)?;
    dest.set_len(size)?;
    let source = unsafe{memmap::Mmap::map(&source)?};
    let mut dest = unsafe {memmap::MmapMut::map_mut(&dest)?};
    
    dest.par_chunks_mut(args.chunk_size).zip(source.par_chunks(args.chunk_size)).for_each(|(dest_chunk, source_chunk)|dest_chunk.copy_from_slice(source_chunk));
    
    Ok(())
}

1 Like

I learned many things here, but most precious is Rayon's Iter:Zip.

Thank you so much for sharing such an eloquent solution for the blocks. I'm grateful.
You're true genius is reflected by the above code :slight_smile:

2 Likes

I modified my code to reflect your rayon .zip() recommendation.

yBlockCopyConf2.DestinationFileMap.par_chunks_mut(myBlockCopyConf2.Block_size as usize).zip(myBlockCopyConf2.SourceFileMap.par_chunks(myBlockCopyConf2.Block_size as usize)).for_each(|(dest_chunk, source_chunk)|dest_chunk.copy_from_slice(source_chunk));