Can you cancel a rusoto event stream

Hello All,

I'm currently writing a service that consumes records from AWS kinesis with enhanced fan out, so using subscribe_to_shard() on an existing consumer. I can see in Amazons example using Java they have a cancel method on their reactivestreams.Subscription object that allows them to stop the subscription but I can't find any similar functionality on the rusoto's EventStream (which appears to do for rust what Subscription does for Java). Does any one know how to "cancel" an event stream for example would explicitly dropping it work or would that just mean I'd stop receiving the records (as opposed to stopping the consumer from sending them).

As always thanks in advance,

Rich

It looks like EventStream implements the Stream trait to asynchronously read events from the event.

In Rust, streams and futures need to be polled/awaited in order to do anything, so to "cancel" the EventStream all you need to do is drop it.

But when you call subscribe_to_shard aws starts "pushing" data to you (as apposed to you having to pull it from kinesis). If I just drop the stream that wouldn't notify AWS kinesis to stop pushing the data unless that functionality has been specifically implemented on it, which gives me the problem of data being loosed because kinesis will just be pushing to a stream that no longer exists ?

Also just had a look through the actual code for it (rusoto/event_stream.rs at 31cf1506f9f4bd7af7a1f86ced3cec436913d518 · rusoto/rusoto · GitHub) and I can't see any implementation of drop that would notify AWS to stop pushing data

The EventStream contains a ByteStream. This is a wrapper around a Stream<Bytes, _> (e.g. a socket), so when the ByteStream is dropped it'll drop the stream of bytes and close any network connections that are open. The other end will receive some sort of disconnect message when the network connection is closed, and AWS will know to stop pushing data because all further writes will result in an error.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.