Sharing a buffer between two threads

I want to split a compression function into two threads. One function calculates LZ77 matches in the input, and stores them in this structure:

pub struct Match
{
  pub position: usize,
  pub length: usize,
  pub distance: usize
}

In my current implementation, I simply calculate and store all these matches before starting to process them to generate the final output, but it should be possible to start the second phase of the compression before all the matching has been done.

So I want to have some kind of shared buffer, and when the match function has progressed sufficiently far through the input, the second phase of the compression can start and run in parallel.

The thread doing the matching will be concurrently writing (appending) new matches to the same buffer that the other thread is reading ( although not the same locations within the buffer ).

I have done some reading on concurrency ( e.g. Shared-State Concurrency - The Rust Programming Language ) but in spite of this I am a little uncertain how to proceed.

Any hints?

You could use a concurrent queue, like those provided by the crossbeam crate.

1 Like

If the original buffer isn't going to be mutated then it's easy enough to share. You could put it in an Arc<[u8]> (think of it as a reference-counted, immutable Vec<u8>) where each thread gets its own copy of the reference-counted pointer.

From there I'd use a channel (see std::sync::mpsc or crossbeam::channel) so the matcher thread can send each Match to the compression thread, allowing the compressing thread to compress everything up to that point.

1 Like

What I am thinking about is to have a small number of messages exchanged, which will look roughly like this. I will call the two threads P ( the thread that produces the matches ) and C ( the thread that consumes them ).

C -> P "Please let me know when you have processed all the input up to i bytes".
P -> C "Ok, I have processed j bytes ( j >= i ), and there are m matches up to that point."

C can then safely read the m matches from the shared buffer ( as P will not be over-writing them ). When C has processed some matches ( and is done with them ), it sends a new message:

C->P : "I have processed k matches."

This continues ( at some point P will have processed all the bytes ).

The question is: can I actually have an "unsafe" shared buffer/array? The buffer can have a fixed size ( and it is circular). P only writes into the shared buffer, only C reads the shared buffer I would need a primitive which P can use which ensures the values it has written into the shared buffer really have been written to memory ( and are not in some queue ).

Well (not without a bit of a struggle, e.g. finding the scoped thread pool ) I got something to work! Without a thread pool it is drastically slower, but with a thread pool it's slightly faster. I think it would go faster if instead of sending a single Match at a time I sent say 100 matches, or maybe un-encoded blocks ( I will work on that ).

Anyway, here it is ( compress_par is the multi-threaded version, compress is the single-threaded version ). The other modules ( matcher, bit, block ) can be found here:
https://github.com/georgebarwood/Rust


use crate::matcher;
use crate::matcher::Match;
use crate::bit::BitStream;
use crate::block::Block;

use scoped_threadpool::Pool;
use crossbeam::{channel,Receiver};

pub fn compress_par( inp: &[u8], p: &mut Pool ) -> Vec<u8>
{
  let mut out = BitStream::new();
  let (tx, rx) = channel::unbounded();
  p.scoped( |s| 
  {
    s.execute( || { matcher::find_par( inp, tx ); } );
    s.execute( || { do_blocks( inp, rx, &mut out ); } );
  } );
  out.bytes
}

pub fn do_blocks( inp: &[u8], mrx: Receiver<Match>, out: &mut BitStream )
{
  out.write( 16, 0x9c78 );

  let mut mlist : Vec<Match> = Vec::new();

  let len = inp.len();
  let mut ii = 0; // input index
  let mut mi = 0; // match index
  loop
  {
    let mut block_size = len - ii;
    if block_size > 0x4000 { block_size = 0x4000; }
    let mut b = Block::new( ii, block_size, mi );

    loop // Get matches for the block.
    {
      let brk;
      match mrx.recv()
      {
        Ok( m ) => 
        {
          brk = m.position >= b.input_end;
          mlist.push( m );          
        },
        Err( _err ) => brk = true
      }
      if brk { break; }
    }

    b.init( &inp, &mlist );
    ii = b.input_end;
    mi = b.match_end;
    b.write( &inp, &mlist, out, ii == len );
    if ii == len { break; }
  }   
  out.pad(8);
  out.write( 32, adler32( &inp ) as u64 );
  out.flush();

  // println!( "Total matches={}", mlist.len() );
}

pub fn compress( inp: &[u8] ) -> Vec<u8>
{
  let mut out = BitStream::new();
  out.write( 16, 0x9c78 );

  let mut mlist : Vec<Match> = Vec::new();
  matcher::find( inp, &mut mlist );

  // for mat in &mlist { println!( "Match at {} length={} distance={}", mat.position, mat.length, mat.distance ); } }
  // println!( "Total matches={}", mlist.len() );

  let len = inp.len();
  let mut ii = 0; // input index
  let mut mi = 0; // match index
  loop
  {
    let mut block_size = len - ii;
    if block_size > 0x4000 { block_size = 0x4000; }
    let mut b = Block::new( ii, block_size, mi );
    b.init( &inp, &mlist );
    ii = b.input_end;
    mi = b.match_end;
    b.write( &inp, &mlist, &mut out, ii == len );
    if ii == len { break; }
  }   
  out.pad(8);
  out.write( 32, adler32( &inp ) as u64 );
  out.flush();

  out.bytes
}

/// Checksum function per RFC 1950.
pub fn adler32( input: &[u8] ) -> u32
{
  let mut s1 = 1;
  let mut s2 = 0;
  for b in input
  {
    s1 = ( s1 + *b as u32 ) % 65521;
    s2 = ( s2 + s1 ) % 65521;
  }
  s2 * 65536 + s1   
}

Later, I realised that instead of using 3 threads, as above, I could use 2 threads like this:

  p.scoped( |s| 
  {
    s.execute( || { matcher::find( inp, tx ); } );
    do_blocks( inp, rx, &mut out );
  } );

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.