Kafka forwarder to either A) a DB or B) back to Kafka

The service

  • consumes from Kafka, and
  • serves a /metrics HTTP endpoint.

Is the following, abridged code the 'best' solution to have both, an (indefinitely running) Kafka consumer and an HTTP service?

  1. Is creating a second Tokio runtime for the Kafka consumer the 'best' approach? See Runtime::new()below.:point_down:
  2. db.write(message) retries HTTP requests inside a loop {}, using tokio::time::sleep (with a backoff):
    1. is this acceptable, or, a recipe for failure?
    2. is there a better solution for this?
#[tokio::main]
async fn main() {
  std::thread::spawn(|| {
    // Running the Kafka consumer in a second 
    // Tokio runtime 👇 is what I'm most unsure of. 
    let rt = Runtime::new().unwrap();
    let _ = rt.block_on(async { consume_kafka().await });
  });
  let metrics_route = warp::path!("metrics")
    .and_then(metrics_handler);
  warp::serve(metrics_route)
    .run(SocketAddr::new("0.0.0.0", "9000"))
    .await
}

async fn consume_kafka() -> Result<()> {
  loop {
    match kafka_consumer.recv().await {
      // Extract `message`, etc.
      match db.write(message) {
        Ok  => store_kafka_offset()
        Err => store_kafka_offset_and_produce_to_dlq()
      }
    }
  }
}

Background

This topic lead me onto Alice's article on blocking, which, in its 3rd section, describes an operation that "keeps running forever":

Spawn a dedicated thread

If a blocking operation keeps running forever, you should run it on a dedicated thread. For example consider a thread that manages a database connection using a channel to receive database operations to perform. Since this thread is listening on that channel in a loop, it never exits.

I think that the above Kafka consumer falls into this "running forever" category. Is the above implementation though also the best possible?

If you're using rdkafka, it's async consumer already uses a dedicated thread under the hood.

1 Like

Yes, I'm using rdkafka: how can main() then be simplified, knowing that rdkafka already starts its thread? My (beginner) attempt below fails – consume_kafka isn't doing its work, nor does it output any logs.

async fn main() {
  std::thread::spawn(|| async {
    let _ = consume_kafka().await;
  });
  // omitted
}

And apologies, should've written the main [dependencies]:

  • rust 1.80.0
  • tokio 1.37.0
  • rdkafka 0.36.2

You can run consume_kafka on the same runtime:

#[tokio::main]
async fn main() {
  let consume_handle = tokio::spawn(consume_kafka());
  let metrics_route = warp::path!("metrics")
    .and_then(metrics_handler);
  let warp_res = warp::serve(metrics_route)
    .run(SocketAddr::new("0.0.0.0", "9000"))
    .await;
  match consume_handle.await {
    Ok(Ok(())) => (),
    Ok(Err(e)) => todo!("Handle `consume_kafka` error"),
    Err(p) => panic!("`consume_kafka panicked: {panic:?}"),
  }
  warp_res
}

async fn consume_kafka() -> Result<()> {
  loop {
    match kafka_consumer.recv().await {
      // Extract `message`, etc.
      match db.write(message) {
        Ok  => store_kafka_offset()
        Err => store_kafka_offset_and_produce_to_dlq()
      }
    }
  }
}

Note that I've done a bad job of handling the errors from consume_kafka, and in real code, you'd probably use select! to wait for either warp::serve or consume_kafka to return first.

2 Likes