Hi,
I have a function where I get messages from kafka. the recv()
is a blocking call. So I want to know if we can introduce a timeout and cancel the call (I see that rdkafka doesn't give a way to timeout once recv()
is made). If yes, then any sample code that can be shared?
async fn foo(){
for i in 0..9 {
let msg = consumer.recv().await.unwrap();
println!(
"message 1st pass:::: i{} ... {:?}",
i,
std::str::from_utf8(msg.payload().unwrap())
)
}
}
use a combinator to add time out to any Futures
.
for tokio, it is tokio::time::timeout
, use match
or if let
to check the result:
match tokio::time::timeout(Duration::from_millis(1000), consumer.recv()).await {
Ok(received) => todo!(),
Err(_) => {
eprintln!("timeout!");
}
}
for other async runtimes, there are similar functions.
the point is, there would be a combinatorial explosion of APIs if every async operation provides its own time out mechanism. for non-async functions, they can't be easily composed using combinators, so they don't have a choice. in a async
context, however, we build generic abstractions on top of combinators.
in general, the idea is to use a "alternative" combinator together with a "timer" Future
. the combinator part can be runtime agnostic, for instance, futures_util::Either
or futures_lite::or
, but the timer part is tied to a specific runtime, because it needs a reactor to function, some examples are tokio::Sleep
, async_io::Timer
, async_timer::OneShot
, etc.
2 Likes