Composing tokio_util codecs

Hi everyone! I'm wresting with what I suspect is a simple thing to solve, but I haven't managed it yet. I have a file that contains a binary format which is framed twice; individual MPEG packets of fixed length are framed with their MPEG header, but this succession of MPEG packets is itself broken up at arbitrary intervals with a DVB-S2 header and CRC. This DVB-S2 header might be inserted at any point in the stream, so for example it might appear in the middle of an MPEG header.

My approach to decoding the input file into a stream of MPEG packet payloads was going to be writing a DvbS2 Decoder that reads the file (from stdin) and yields successive DVB-S2 payloads, and then to feed the output of that Decoder into an MPEG Decoder that yields successive MPEG packet payloads. However, I'm having trouble figuring out how to pipe the output of the DvbS2 Decoder stream into the input of the MPEG Decoder. I thought it might be something like:

let mut dvb_reader = FramedRead::new(tokio::io::stdin(), DvbDecoder {});
let (receiver, mut sender) = simplex(BUFFER_SIZE);
let mut mpeg_reader = FramedRead::new(receiver, MpegDecoder {});
while let Some(Ok(dvb)) = dvb_reader.next().await {
    sender.write_all(dvb.payload).await?
}
while let Some(Ok(mpeg)) = mpeg_reader.next().await {
    eprintln!("got an mpeg-ts packet with continuity counter {}", mpeg.continuity_counter);
}

This code compiles but doesn't do anything as far as I can tell. It also seems not quite right semantically in that it doesn't feel like I should need to call next on both streams, and even if I do, I shouldn't be doing it consecutively.

Am I missing a simple way to get the Stream of output from the DvbS2 Decoder and "pipe" it into the input stream of the Mpeg Decoder? Maybe I'm missing a DvbS2 Encoder that just concatenates the payloads of each frame?

Thanks so much!
Ben

I'm not familiar with exactly how to use tokio codecs, but I'm pretty sure the first while loop will write data into sender until the buffer is full, and then hang because the buffer will never be emptied by anything.

You need your two decoding stages to run concurrently, not sequentially, so you need a join operation:

tokio::join!(
    async {
        while let Some(Ok(dvb)) = dvb_reader.next().await {
            sender.write_all(dvb.payload).await?
        }
    }
    async {
        while let Some(Ok(mpeg)) = mpeg_reader.next().await {
            eprintln!("got an mpeg-ts packet with continuity counter {}", mpeg.continuity_counter);
        }
    }
);
1 Like

Logically a codec for T turns a byte stream (AsyncRead) into an async iterable (Stream) of T. The only time it makes sense to "pipe" them is if the first is a packetization of reasonably arbitrary bytes, to reparse as an embedded stream. If you can get that mapping out, that is get a Stream<Item=Bytes>, you can use StreamReader to turn that back into an AsyncRead to feed into the next codec.

1 Like

That was the key I was missing! Or at least, I got it working by transforming the Stream<Item = Result<DvbS2Data, _>> into a Stream<Item = Result<Bytes, _>> and then handing that to a StreamReader to go into the next FramedRead:

let mut stdout = tokio::io:stdout();
let dvb_reader = FramedRead::new(tokio::io::stdin(), DvbS2Decoder {});
let dvb_bytes = dvb_reader.filter_map(|frame| frame.ok())
    .map(|line| -> Result<Bytes, Error> { Ok(Bytes::from(line)) });
let dvb_bytes_stream = StreamReader::new(dvb_bytes);
let mut mpeg_reader = FramedRead::new(dvb_bytes_stream, MpegDecoder {});

while let Some(Ok(line)) = mpeg_reader.next().await {
    stdout.write_all(line.as_bytes()).await?
}

Thanks very much.

Ben

1 Like