Nested streams and cancelling old inner streams

I have a stream of streams. The outer streams is an async iterator that returns inner streams asynchronously. The problem is that any running inner stream becomes irrelevant as soon as a new inner stream is delivered by the outer stream. How can I automatically cancel the old inner stream in case it still running and start with the new inner stream?

You can (with some caveats) cancel a future by just dropping it. However, if you have spawned a separate task to run the future, you would need to have some way of signalling the task that it needs to drop the future.

I think what you're going for is something like this:

If so, you can always cancel the inner stream by just dropping it. That's what the exit_early flag does in my example.

Thank you for the example code. In the end I made an extension stream trait. The extension contains a flatten unordered method that drops previous inner streams when a new inner stream arrives.