How to put timeouts to a blocking call?

Hi,

I have a function where I get messages from kafka. the recv() is a blocking call. So I want to know if we can introduce a timeout and cancel the call (I see that rdkafka doesn't give a way to timeout once recv() is made). If yes, then any sample code that can be shared?

async fn foo(){
        for i in 0..9 {
            let msg = consumer.recv().await.unwrap();
            println!(
                "message 1st pass:::: i{} ... {:?}",
                i,
                std::str::from_utf8(msg.payload().unwrap())
            )
        }
}

use a combinator to add time out to any Futures.

for tokio, it is tokio::time::timeout, use match or if let to check the result:

match tokio::time::timeout(Duration::from_millis(1000), consumer.recv()).await {
    Ok(received) => todo!(),
    Err(_) => {
        eprintln!("timeout!");
    }
}

for other async runtimes, there are similar functions.

the point is, there would be a combinatorial explosion of APIs if every async operation provides its own time out mechanism. for non-async functions, they can't be easily composed using combinators, so they don't have a choice. in a async context, however, we build generic abstractions on top of combinators.

in general, the idea is to use a "alternative" combinator together with a "timer" Future. the combinator part can be runtime agnostic, for instance, futures_util::Either or futures_lite::or, but the timer part is tied to a specific runtime, because it needs a reactor to function, some examples are tokio::Sleep, async_io::Timer, async_timer::OneShot, etc.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.