Awaiting on stream inside tokio::select! is failing

Hello guys,

I am writing my first Kubernetes client application using rust, and the idea I am trying to implement is to watch on the stream for changes and also for the shutdown signal to close the loop/application.

following is my stream object

        let namespaces = kube::Api::<Namespace>::all(self.kubeclient.clone());
        let label = "xxx=enabled";
        let lp = ListParams::default().labels(label);
        let watch_stream = watcher(namespaces, lp).applied_objects();
        pin_mut!(watch_stream);
        let mut watch_stream = watch_stream.fuse();

and my loop looks like below:

        loop {
            tokio::select! {
                let Some(event) = watch_stream.try_next() => {
                    println!("Event: {:?}", event);
                }
                // watch for the stop signal
                _ = rx.recv() => {
                    println!("Stop signal received");
                    break;
                }
            }
        }

Compilation error:

error[E0308]: mismatched types
  --> src/proxy/proxymanager.rs:64:21
   |
63 | /             tokio::select! {
64 | |                 let Some(event) = watch_stream.try_next() => {
   | |                     ^^^^^^^^^^^ expected enum `Result`, found enum `Option`
65 | |                     println!("Event: {:?}", event);
66 | |                 }
...  |
71 | |                 }
72 | |             }
   | |_____________- this expression has type `&Result<std::option::Option<Namespace>, kube::kube_runtime::watcher::Error>`
   |
   = note: expected enum `Result<std::option::Option<Namespace>, kube::kube_runtime::watcher::Error>`
              found enum `std::option::Option<_>`

The watcher stream in a while loop works with no errors, struggling to understand what is the difference between two scenarios and why the compiler is reading the return type of watch_stream.try_next() as std::option::Option<_> instead of Result<std::option::Option<Namespace>, kube::kube_runtime::watcher::Error>

Working code for watching stream events in while loop with no receive signal:

        while let Some(event) = watch_stream.try_next().await? {
            println!("Event: {:?}", event);
        }

Appreciate any kind of help.

Thanks!!

Blockquote

The try_next method returns a Result<Option<T>, E>, so the pattern has to be Ok or Err to work with try_next. The reason that it works in your while loop is due to the question mark.

You can do this instead:

loop {
    tokio::select! {
        let Some(event) = watch_stream.next() => {
            let event = event?;
            ...
        }
    }
}

It worked, Thanks @alice

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.