Unable to Use Custom Partitioner with FutureProducer in rust-rdkafka

I'm trying to implement a custom partitioner for a Kafka producer using rust-rdkafka, specifically with the FutureProducer. However, despite implementing a custom partitioner and specifying it in the ProducerContext, the custom partitioning logic is not being used. Instead, it seems to fall back to the default partitioning behavior.

use rdkafka::producer::{DeliveryResult, FutureProducer, FutureRecord, Producer, ProducerContext};
use rdkafka::util::{DefaultRuntime, Timeout};
use std::time::Duration;
use crc32fast::Hasher;
use rdkafka::{ClientConfig, ClientContext};
use rdkafka::config::FromClientConfigAndContext;
use rdkafka::producer::Partitioner;

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
    let producer_context = FutureProducerContext::default();
    let mut config = ClientConfig::new();
    let config = config
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .set("batch.num.messages", "1");

    let producer: FutureProducer<FutureProducerContext, DefaultRuntime> =
        FutureProducer::from_config_and_context(&config, producer_context)?;

    let topic = "commit_test".to_string();
    let payload = serde_json::to_vec("hello")?;

    let record = FutureRecord::to(&topic).payload(&payload).key("aa");
    producer
        .send(record, Duration::from_secs(1))
        .await
        .map_err(|(e, m)| anyhow::anyhow!("Kafka produce error {e} for msg {:?}", m))?;

    producer.flush(Timeout::from(Duration::from_secs(10)))?;

    println!("done");

    Ok(())
}

#[derive(Clone)]
pub struct Crc32Partitioner {}

impl Partitioner for Crc32Partitioner {
    fn partition(
        &self,
        _topic_name: &str,
        key: Option<&[u8]>,
        partition_cnt: i32,
        _is_partition_available: impl Fn(i32) -> bool,
    ) -> i32 {
        println!("inside custom partition");
        let partition = match key {
            None => 0,
            Some(key_bytes) => {
                let mut hasher = Hasher::new();
                hasher.update(key_bytes);
                let hash = hasher.finalize();
                (hash as i32 & 0x7FFFFFFF) % partition_cnt
            }
        };
        println!("Partition: {partition}");
        partition
    }
}

pub struct FutureProducerContext {
    partitioner: Crc32Partitioner,
}
impl Default for FutureProducerContext {
    fn default() -> Self {
        Self {
            partitioner: Crc32Partitioner {},
        }
    }
}

impl ClientContext for FutureProducerContext {}

impl ProducerContext<Crc32Partitioner> for FutureProducerContext {
    type DeliveryOpaque = ();

    fn delivery(&self, delivery_result: &DeliveryResult<'_>, _delivery_opaque: Self::DeliveryOpaque) {
        if let Err(e) = delivery_result {
            println!("Kafka Msg Delivery error: {:?}", e);
        } else {
            println!("Kafka Msg Delivery success");
        }
    }

    fn get_custom_partitioner(&self) -> Option<&Crc32Partitioner> {
        Some(&self.partitioner)
    }
}

Cargo.toml

rdkafka = { version = "0.36.2", features = ["cmake-build"]}
tokio = { version = "1.36.0", features = ["full"] }
anyhow = "1.0.80"
futures = "0.3.30"
log4rs = "1.3.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.117"
log = "0.4.21"
async-trait = "0.1.82"
lazy_static = "1.5.0"
crc32fast = "1.4.2"

Your code seams to be right for what your trying to do (at least on the first glance).

So, this is one of those things that require debugging on your data and your specific setup. Maximum I can do, is ask some questions.

  • I see you have a debug print in your partitioner, is it being printed?
  • Are keys in real data actually Nones or are they always defined? I'm asking because of if they are None this note in the docs may be relevant.

sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key

If your partitioner is not being called at all on non-null-key messages than theoretically there may be some bug in the rust-rdkafka lib and in this case you should make a more specific bug-report to them with most minimal example.

1 Like

Hi @eliduvid

  • I see you have a debug print in your partitioner, is it being printed? -- No
  • Are keys in real data actually None s or are they always defined? -- They are always defined. Even with sticky.partitioning.linger.ms = 0 also not working.

So, this is one of those things that require debugging on your data and your specific setup:

I have already have main function shared in code I shared. Except that only local Kafka setup is required(If you willing to try it out)

pub async fn main() -> anyhow::Result<()> {
    let producer_context = FutureProducerContext::default();
    let mut config = ClientConfig::new();
    let config = config
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .set("batch.num.messages", "1")
        .set("sticky.partitioning.linger.ms", "0");

    let producer: FutureProducer<FutureProducerContext, DefaultRuntime> =
        FutureProducer::from_config_and_context(&config, producer_context)?;

    let topic = "commit_test".to_string();
    let payload = serde_json::to_vec("hello")?;

    let record = FutureRecord::to(&topic).payload(&payload).key("aa");
    producer
        .send(record, Duration::from_secs(1))
        .await
        .map_err(|(e, m)| anyhow::anyhow!("Kafka produce error {e} for msg {:?}", m))?;

    producer.flush(Timeout::from(Duration::from_secs(10)))?;

    println!("done");

    Ok(())
}