The service
- consumes from Kafka, and
- serves a
/metrics
HTTP endpoint.
Is the following, abridged code the 'best' solution to have both, an (indefinitely running) Kafka consumer and an HTTP service?
- Is creating a second Tokio runtime for the Kafka consumer the 'best' approach? See
Runtime::new()
below. db.write(message)
retries HTTP requests inside aloop {}
, usingtokio::time::sleep
(with a backoff):- is this acceptable, or, a recipe for failure?
- is there a better solution for this?
#[tokio::main]
async fn main() {
std::thread::spawn(|| {
// Running the Kafka consumer in a second
// Tokio runtime 👇 is what I'm most unsure of.
let rt = Runtime::new().unwrap();
let _ = rt.block_on(async { consume_kafka().await });
});
let metrics_route = warp::path!("metrics")
.and_then(metrics_handler);
warp::serve(metrics_route)
.run(SocketAddr::new("0.0.0.0", "9000"))
.await
}
async fn consume_kafka() -> Result<()> {
loop {
match kafka_consumer.recv().await {
// Extract `message`, etc.
match db.write(message) {
Ok => store_kafka_offset()
Err => store_kafka_offset_and_produce_to_dlq()
}
}
}
}
Background
This topic lead me onto Alice's article on blocking, which, in its 3rd section, describes an operation that "keeps running forever":
Spawn a dedicated thread
If a blocking operation keeps running forever, you should run it on a dedicated thread. For example consider a thread that manages a database connection using a channel to receive database operations to perform. Since this thread is listening on that channel in a loop, it never exits.
I think that the above Kafka consumer falls into this "running forever" category. Is the above implementation though also the best possible?