Consume stream without eternal blocking

Hello,

I use a stream to consume HTTP SSE response, example code with websocket (this code fail just now because demo.piesocket.com have a SSL problem) :

use async_std::task;
use futures_util::StreamExt;
use reqwest::{self, Method};

async fn job() {
    let response = reqwest::Client::new()
        .request(Method::GET, format!("wss://demo.piesocket.com/v3/channel_1?api_key=oCdCMcMPQpbvNjUIzqtvF1d2X2okWpDQj4AwARJuAgtjhzKxVEjQU6IdCjwm&notify_self"))
        .send()
        .await.unwrap();
    let mut stream = response.bytes_stream();
    while let Some(_) = stream.next().await {
        println!("Hello world")
    }
}

fn main() {
    task::block_on(job());
}

In that code, job is blocking on while let Some(_) = stream.next().await {. I would like to be able to stop the loop if my application want to stop. I think to use a channel to send message. But, how can I consume the stream, and sometimes, for example if a timeout is reached, take a look into my channel to see if an interrupt request has been sent ?

Thanks in advance !

With Tokio you would be able to do something like this:

loop {
    tokio::select! {
        msg = stream.next() => {
            println!("Got: {:?}", msg);
        },
        _ = tokio::signal::ctrl_c() => return,
    }
}

or you could abort the task remotely without changing job itself

#[tokio::main]
async fn main() {
    let job_task = tokio::spawn(job);

    tokio::signal::ctrl_c().await;

    job_task.abort();
    job_task.await;
}

I'm not sure how any of this translates to async-std, though I'm surprised that it works together with reqwest in the first place as reqwest is Tokio-specific.

Of course the ctrl_c signal above could be replaced with anything else, e.g. a tokio::time::sleep call.

1 Like

Thanks for your time, i will take a look on it !

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.