I'm trying to implement a custom partitioner for a Kafka producer using rust-rdkafka
, specifically with the FutureProducer
. However, despite implementing a custom partitioner and specifying it in the ProducerContext
, the custom partitioning logic is not being used. Instead, it seems to fall back to the default partitioning behavior.
use rdkafka::producer::{DeliveryResult, FutureProducer, FutureRecord, Producer, ProducerContext};
use rdkafka::util::{DefaultRuntime, Timeout};
use std::time::Duration;
use crc32fast::Hasher;
use rdkafka::{ClientConfig, ClientContext};
use rdkafka::config::FromClientConfigAndContext;
use rdkafka::producer::Partitioner;
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
let producer_context = FutureProducerContext::default();
let mut config = ClientConfig::new();
let config = config
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.set("batch.num.messages", "1");
let producer: FutureProducer<FutureProducerContext, DefaultRuntime> =
FutureProducer::from_config_and_context(&config, producer_context)?;
let topic = "commit_test".to_string();
let payload = serde_json::to_vec("hello")?;
let record = FutureRecord::to(&topic).payload(&payload).key("aa");
producer
.send(record, Duration::from_secs(1))
.await
.map_err(|(e, m)| anyhow::anyhow!("Kafka produce error {e} for msg {:?}", m))?;
producer.flush(Timeout::from(Duration::from_secs(10)))?;
println!("done");
Ok(())
}
#[derive(Clone)]
pub struct Crc32Partitioner {}
impl Partitioner for Crc32Partitioner {
fn partition(
&self,
_topic_name: &str,
key: Option<&[u8]>,
partition_cnt: i32,
_is_partition_available: impl Fn(i32) -> bool,
) -> i32 {
println!("inside custom partition");
let partition = match key {
None => 0,
Some(key_bytes) => {
let mut hasher = Hasher::new();
hasher.update(key_bytes);
let hash = hasher.finalize();
(hash as i32 & 0x7FFFFFFF) % partition_cnt
}
};
println!("Partition: {partition}");
partition
}
}
pub struct FutureProducerContext {
partitioner: Crc32Partitioner,
}
impl Default for FutureProducerContext {
fn default() -> Self {
Self {
partitioner: Crc32Partitioner {},
}
}
}
impl ClientContext for FutureProducerContext {}
impl ProducerContext<Crc32Partitioner> for FutureProducerContext {
type DeliveryOpaque = ();
fn delivery(&self, delivery_result: &DeliveryResult<'_>, _delivery_opaque: Self::DeliveryOpaque) {
if let Err(e) = delivery_result {
println!("Kafka Msg Delivery error: {:?}", e);
} else {
println!("Kafka Msg Delivery success");
}
}
fn get_custom_partitioner(&self) -> Option<&Crc32Partitioner> {
Some(&self.partitioner)
}
}
Cargo.toml
rdkafka = { version = "0.36.2", features = ["cmake-build"]}
tokio = { version = "1.36.0", features = ["full"] }
anyhow = "1.0.80"
futures = "0.3.30"
log4rs = "1.3.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.117"
log = "0.4.21"
async-trait = "0.1.82"
lazy_static = "1.5.0"
crc32fast = "1.4.2"