Streams / spawn / rdkafka

Edit: Vastly simplifying code example.

After about a week of messing with this, I feel like I have some fundamental misunderstanding of either the rdkafka library, streams, async, or some combination of them all.

My goal is to pull from a kafka topic, but to spawn a different thread of execution for each partition, using rdkafka's split_partition_queue function. Both StreamConsumer and StreamPartitionQueue have stream() functions which stream BorrowedMessage<'_> messages from them, and are interchangeable for purposes of this example as they generate basically the same error.

This basically errors if my stream involves the buffered family of StreamExt functions, and I try to explicitly spawn. I am lead to believe that everything in this library is Sync and Send, and as long as I wrap everything in an Arc, then everything is "owned", yet I cannot spawn a separate thread, so I don't understand exactly how I'm meant to process messages separately.

The reason I'm using buffered, is because once in awhile, during my stream I need to make http calls that will take a half a second to resolve, so I must buffer a few thousand messages to ensure everything keeps moving, and that works fine, so long as I don't spawn it into another thread.

use futures::stream::TryStreamExt;
use futures::Future;
use rdkafka::consumer::DefaultConsumerContext;
use rdkafka::consumer::StreamConsumer;

use rdkafka::error::KafkaError;

#[tokio::main]
async fn main() {
    // mainrun().await; // works
    tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
}

async fn mainrun() -> Result<(), KafkaError> {
    let config = rdkafka::config::ClientConfig::new();
    let x: StreamConsumer = config
        .create::<StreamConsumer<DefaultConsumerContext>>()
        .unwrap();

    x.stream()
        .map_ok(|_msg| async { Ok::<_, KafkaError>(()) })
        .try_buffered(2)
        .try_collect()
        .await
}

The error is:

error: implementation of `FnOnce` is not general enough
  --> foo.rs:11:5
   |
11 |     tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
   |     ^^^^^^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(BorrowedMessage<'0>) -> {async block@davidm/src/bin/david1.rs:22:28: 22:61}` must implement `FnOnce<(BorrowedMessage<'1>,)>`, for any two lifetimes `'0` and `'1`...
   = note: ...but it actually implements `FnOnce<(BorrowedMessage<'_>,)>`

error: implementation of `Stream` is not general enough
  --> foo.rs:11:5
   |
11 |     tokio::spawn(mainrun()); // this doesn't work, and I have no idea why.
   |     ^^^^^^^^^^^^^^^^^^^^^^^ implementation of `Stream` is not general enough
   |
   = note: `Stream` would have to be implemented for the type `MessageStream<'0, DefaultConsumerContext>`, for any lifetime `'0`...
   = note: ...but `Stream` is actually implemented for the type `MessageStream<'1, DefaultConsumerContext>`, for some specific lifetime `'1`

I figured this out, finally. It was a lifetime issue, and it is fixed by opting out of async keyword sugar and introducing a lifetime that helps the compiler know that the messages you pass to the async blocks outlive the source of the messages. I now know more about lifetimes in rust than I thought I would ever need to know.

Here is an example that does what I intended to do in my original code, which is separate threads of execution pulling messages from each partition. The original code could be made to work with just a couple of the changes up above, which you can see used below.

use futures::stream::TryStreamExt;
use futures::Future;
use rdkafka::consumer::DefaultConsumerContext;
use rdkafka::consumer::StreamConsumer;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::message::BorrowedMessage;
use rdkafka::error::KafkaError;
use std::sync::Arc;
use futures::stream::StreamExt;

#[derive(Clone)]
struct App {
    consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
}

struct PartitionApp {
    queue: StreamPartitionQueue<DefaultConsumerContext>,
}

impl<'a> PartitionApp {
    fn new(queue: StreamPartitionQueue<DefaultConsumerContext>) -> PartitionApp {
        PartitionApp { queue }
    }

    fn run(&'a self) -> impl Future<Output = Result<(), KafkaError>> + 'a {
        self.queue
            .stream()
            .map_ok(|msg: BorrowedMessage<'a>| async move {
                println!("msg: {:?}", msg);
                Ok(())
            }).
            .try_buffered(20)
            .try_collect()
    }
}

impl App {
    fn new() -> App {
        let config = rdkafka::config::ClientConfig::new();
        let consumer: StreamConsumer = config
            .create::<StreamConsumer<DefaultConsumerContext>>()
            .unwrap();
        let consumer = Arc::new(consumer);
        App { consumer }
    }

    fn split(&self, topic: &str, partition: i32) -> PartitionApp {
        let queue = self.consumer.split_partition_queue(topic, partition).unwrap();
        PartitionApp::new(queue)
    }
}

#[tokio::main]
async fn main() {
    let app = App::new();

    let f = |partition: i32| {
        let queue = app.split("topic", partition);
        queue.run()
    };

    let x = tokio::spawn(f(0));
    let y = tokio::spawn(f(1));
    let z = tokio::spawn(f(2));

    let _res = futures::join!(x, y, z);
}