RTSP streaming API sanity check

Anyone interested in sanity checking an API sketch for me?

I'm trying to create a convenient, idiomatic async API for a high-level RTSP streaming client crate. (It lives in my junk repo for now. I need to pick a crate name and move it to its own repo.) It handles messages from layered protocols, at least:

  1. bytes of data on RTSP channels (or maybe later, RTP UDP streams).
  2. RTP/RTCP for each RTSP stream. This layer checks for the correct ssrc, ensures packets are in order (interpreting seq), and ensures monotonicity of timestamp (including extending it to 64-bits so higher layers don't have to deal with wraparound and maybe supporting a "quirks" mode to support cameras that violate monotonicity on SNTP time steps).
  3. demuxers for known codecs: eg the H.264 one handles fragmentation/aggregation of NALs, grouping into "access units", separating parameters (SPS/PPS) from picture data (VCL), and putting pictures into the correct format (Annex B or length-prefixed data for putting into a .mp4). (Maybe also support a layer 2.5 for multiplexing proxies: buffer all the packets since the first reference image still in use without creating new defragmented and refragmented versions in memory. This would efficiently let newly connected clients catch up quickly.)

I'm thinking of defining an enum for each layer I define that gives everything visible at that layer for all streams, all possible codecs. Eg, for layer 3 above I think I'd define a futures::Stream<Item = Result<Message, Error>> that has nested enums:

enum Message {
    RtcpSenderReport {
        stream_id: usize,
        rtp_timestamp: Timestamp,
        ntp_timestamp: NtpTimestamp,
    },
    NewParameters {
        stream_id: usize,
        rtp_timestamp: Timestamp,
        parameters: Parameters
    },
    Data {
        stream_id: usize,
        rtp_timestamp: Timestamp,
        data: Data,
    },
}

/// Stream parameters.
/// These may or may not be available out-of-band before starting the stream.
/// Some stream types supported updating the parameters in-band during the stream.
enum Parameters {
    Video(video::Parameters),
    Audio(audio::Parameters),
}

impl Parameters {
    /// Returns a concrete ISO/IEC 14496-12 section 8.5.2.2 `SampleEntry`
    /// suitable for dropping into a ISO BMFF (`.mp4`) file. Returns `None` for
    /// codecs that have no defined `.mp4` box for this.
    fn mp4_sample_entry(&self) -> Option<Bytes> {
        match self {
            Video(p) => v.sample_entry(),
            Audio(p) => p.sample_entry(),
        }
    }
}

/// Timestamped data, one enum value per supported SDP media type / IANA media type registry.
enum Data {
    Video(video::Picture),
    Audio(audio::Frame),
    Application(bytes::Bytes), // eg ONVIF metadata streams.
}

impl AsRef<Target = Bytes> for Data { /* ... */ }

mod video {

/// Video parameters, one enum value per supported encoding.
enum Parameters {
    H264(h264::Parameters),
    H265(h265::Parameters),
}

// This is a bad name, but the enum and trait conceptually represent the same
// thing. The trait is implemented by the struct backing each enum value, and
// some structs have extra methods. What do other folks do in this case?
trait ParametersTrait {
    fn pixel_dimensions(&self) -> (u32, u32);
    // ...
}

impl ParametersTrait for Parameters { /* match clauses everywhere */ }

// similar idea for Picture:
enum Picture { /* ... */ }
trait PictureTrait { /* ... */ }
impl PictureTrait for Picture { /* ... */ }

}

mod audio { /* analogous to video */ }

Am I on a good path? Changes I should consider?

Alternatives:

  1. a push API. This feels more convenient for me to write (it's what I have now) but it doesn't feel convenient to use:
    a. You end up with a driver loop over RTSP messages, even if that's not something that means anything to you. If you want to decide when to stop, your driving code needs to check a condition left by your consumer code after every packet, or you need to return Error through the stack for a not-error.
    b. If you want your calling code to talk to your consumer, you need a bunch of inner().inner() crap, or into_inner().into_inner() at the end. (Maybe I could avoid this by having each layer's handler take an inner: Pin<&mut InnerHandler> instead of an inner: I where I: InnerHandler.)
    c. As a minor efficiency point, I use #[async_trait] and having extra allocations as I cross every layer boundary.
  2. a futures::stream::Stream per mycrate::client::Stream. Possible benefits:
    a. less enum stuff to deal with, less match { Foo => unreachable!() } for callers that only care about one stream type, less stream_id checking.
    b. ability to mix and match layers per stream. In particular, the ability to add a codec demuxer outside the crate without giving up the codec demuxers inside the crate.
    Problems:
    a. basically, I'm not sure how the ownership would work or how complicated setup would be.
    b. you have to do a select! if you want multiple streams, which might be more annoying/less efficient, and it might throw away the ordering of messages between streams. I'm not sure yet if that ordering is ever useful.

Didn't get any feedback here, but I went with roughly this plan anyway, I think it's working out pretty well. Moved to this repository.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.