Error on consuming kafka messages

Hi, I am newbie rustacean!
I am trying to consume messages from kafka (actually from docker local redpanda) with the kafka=0.9.0 crate but I get an error that I am not able to solve.

When I run the app, it gives a Panic! with the next message:

thread 'main' panicked at 'called Result::unwrap()on anErr value: Io(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })', src/main.rs:25:35

I see the line, is the poll.unwrap() but I dont know the error message what it really means.

The script:

let mut consumer = Consumer::from_hosts(vec!["localhost:19092".to_owned()])
.with_topic_partitions("product".to_owned(), &[0])
.with_fallback_offset(FetchOffset::Earliest)
.with_group("rust-consumer-group".to_owned())
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
.unwrap();

loop {
    for ms in consumer.poll().unwrap().iter() {  // line 25
        for m in ms.messages() {
            println!("{:?}", m);
        }
        consumer.consume_messageset(ms);
    }
    consumer.commit_consumed().unwrap();
}

The topic product has 2 messages, in json format.

Any help is welcome, thanks!

JuanjoA

An UnexpectedEof error usually happens when you try to read a certain amount of bytes but encounter the end of file signal before being able to read that amount of bytes. Maybe your Consumer is not correctly configured for the redpanda version you are running? I.e. have a look at all the methods on kafka::consumer::Builder and maybe setting them differently will help you get rid of the error.

Consider using match instead of the unwrap() on line 25.

Evidently your consumer.poll() can fail, and it is failing -- is it possible that your Kafka configuration is off?

The code that powers poll seems to be fetch_messages which doesn't obviously fail in that way (failure to fill a buffer is quite a low level problem).

Looking at the kafka code it looks like it has some debug logging, so consider enabling debug logging (i.e. environment variable RUST_LOG=debug or to limit to the kafka crate RUST_LOG=kafka=debug), and you should be able to see a bit more of how far kafka code and dependencies get.

Hi!, sorry for the late reply.
@jofas, I have tried different methods, even crates, and all the time I have been getting the same error.
If I try with a python script, it works fine.

@vados I understand the need for the match in a prod environment, I take note. Your suggestions, have made me learn how the logs works in rust, a good thing.

Finally I have changed redpanda by other docker of confluent kafka and it has worked correctly.
I still dont know the cause of the error but everything points to the redpanda config.

Thanks to both of you for your replies.

1 Like