Tokio Streams - use `?` operator on `Result`?

Hi,

I'd like to return early from a lambda in a Stream.for_each.
Essentially I'm trying to find a more idiomatic way to express the following:

let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
});
if msg.is_err() {
    return err(msg.unwrap_err());
}
let msg = msg.unwrap();

I tried using the ? operator on the Result, but it would return a Result::Err instead of an err:

the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
cannot use the `?` operator in a function that returns `websocket::futures::Failed<_, failure::Error>`
the trait `std::ops::Try` is not implemented for `websocket::futures::Failed<_, failure::Error>`

Is there a better way to return an err early from a Stream?

I'm using tokio 0.1 at the moment.

Thanks for the help in advance!

Best,
ambiso

Does this not work?

let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| 
{
	Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
})?;

The lambda's return type is websocket::futures::Failed<std::option::Option<std::sync::Arc<std::sync::RwLock<Book>>>, failure::Error>.

I'd like the stream to produce an error, when deserialization fails (i.e. return a future that will immediately return an error).

The context of the code looks like this:

stream.from_err().and_then(move |msg| {
   let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
    });
    if msg.is_err() {
        return err(msg.unwrap_err());
    }
    let msg = msg.unwrap();
    // update book with msg...
    ok(Some(book))
})
.filter_map(|x| x); // filter the cases where nothing is produced (with ok(None))

what is err() does this come from futures?

Yep, that's from futures: futures::future::err - Rust

The following does work:

stream.from_err()
.map(move |msg| {
    let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
    })?;
    // update book with msg...
    ok(Some(book))
})
.and_then(|x: Result<_, _>| {
    if x.is_err() {
        err(x.unwrap_err())
    } else {
        ok(x.unwrap())
    }
})
.filter_map(|x| x);

Note that I changed the first and_then to a map. It now returns a Result<Option<Arc<RwLock<Book>>>, Error> instead of a Future<Option<Arc<RwLock<Book>>>, Error>.

However, I feel like there should be some sweeter way to achieve the same!

What is the return type of the enclosing function?

It returns the Stream as a future:

let stream = /* ... */;
ok(stream)

And the function above that, is returning a future yielding the stream.
It looks something like this:

pub fn stream_order_book(
) -> impl Future<Item = impl Stream<Item = Arc<RwLock<Book>>, Error = Error>, Error = Error> {
    ClientBuilder::new("wss://example.org/")
        .unwrap()
        .async_connect_secure(None)
        .and_then(|(c, _)| ok(c)) // Ignore headers
        // send some messages ...
        .from_err()
        .and_then(move |c| {
            let stream = c
                .from_err()
                .map(move |msg| {
                    let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| {
                        Error::from(HawkError::InvalidServerMessage(
                            "The server has replied with an unknown message"
                        ))
                    })?;
                    // process message and update book...
                    Ok(Some(book.clone()))
                })
                .and_then(|x: Result<_, _>| {
                    if x.is_err() {
                        err(x.unwrap_err())
                    } else {
                        ok(x.unwrap())
                    }
                })
                .filter_map(|x| x);
            ok(stream) // return a stream of (references to) books
        })
}

Ok, I'm sorry, I'm afraid I haven't enough experience with futures 0.1 streams to find a solution here... I know it looks a lot cleaner in async await, but I suppose you have your reasons for sticking to 0.1.

Anyway, thanks for the help!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.