version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "billionaire:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
depends_on:
- zookeeper
fn main() {
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("acks", "0")
.create()
.unwrap();
let record: BaseRecord<str, str> = BaseRecord::to("billionaire").payload("billionaires");
let message = producer.send(record);
producer.flush(Duration::from_secs(0));
match message {
Ok(msg) => {
println!("{}", "Message sent")
}
Err(err) => {
println!("{:?}", err);
}
}
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "billionaie_id")
.set("auto.offset.reset", "earliest")
.create()
.unwrap();
consumer.subscribe(&["billionaire"]).unwrap();
loop {
match consumer.poll(Duration::from_secs(1)) {
None => {}
Some(message) => {
println!("{:?}", message);
// consumer.commit("billionaire", CommitMode::Sync);
}
}
}
}
Error
Err(KafkaError (Message consumption error: Resolve (Local: Host resolution failure)))
Err(KafkaError (Message consumption error: AllBrokersDown (Local: All broker connections are down)))