Tokio Streams - use `?` operator on `Result`?

#1

Hi,

I’d like to return early from a lambda in a Stream.for_each.
Essentially I’m trying to find a more idiomatic way to express the following:

let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
});
if msg.is_err() {
    return err(msg.unwrap_err());
}
let msg = msg.unwrap();

I tried using the ? operator on the Result, but it would return a Result::Err instead of an err:

the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
cannot use the `?` operator in a function that returns `websocket::futures::Failed<_, failure::Error>`
the trait `std::ops::Try` is not implemented for `websocket::futures::Failed<_, failure::Error>`

Is there a better way to return an err early from a Stream?

I’m using tokio 0.1 at the moment.

Thanks for the help in advance!

Best,
ambiso

#2

Does this not work?

let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| 
{
	Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
})?;
#3

The lambda’s return type is websocket::futures::Failed<std::option::Option<std::sync::Arc<std::sync::RwLock<Book>>>, failure::Error>.

I’d like the stream to produce an error, when deserialization fails (i.e. return a future that will immediately return an error).

The context of the code looks like this:

stream.from_err().and_then(move |msg| {
   let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
    });
    if msg.is_err() {
        return err(msg.unwrap_err());
    }
    let msg = msg.unwrap();
    // update book with msg...
    ok(Some(book))
})
.filter_map(|x| x); // filter the cases where nothing is produced (with ok(None))
#4

what is err() does this come from futures?

#5

Yep, that’s from futures: https://docs.rs/futures/0.1.26/futures/future/fn.err.html

#6

The following does work:

stream.from_err()
.map(move |msg| {
    let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| {
        Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
    })?;
    // update book with msg...
    ok(Some(book))
})
.and_then(|x: Result<_, _>| {
    if x.is_err() {
        err(x.unwrap_err())
    } else {
        ok(x.unwrap())
    }
})
.filter_map(|x| x);

Note that I changed the first and_then to a map. It now returns a Result<Option<Arc<RwLock<Book>>>, Error> instead of a Future<Option<Arc<RwLock<Book>>>, Error>.

However, I feel like there should be some sweeter way to achieve the same!

#7

What is the return type of the enclosing function?

#8

It returns the Stream as a future:

let stream = /* ... */;
ok(stream)

And the function above that, is returning a future yielding the stream.
It looks something like this:

pub fn stream_order_book(
) -> impl Future<Item = impl Stream<Item = Arc<RwLock<Book>>, Error = Error>, Error = Error> {
    ClientBuilder::new("wss://example.org/")
        .unwrap()
        .async_connect_secure(None)
        .and_then(|(c, _)| ok(c)) // Ignore headers
        // send some messages ...
        .from_err()
        .and_then(move |c| {
            let stream = c
                .from_err()
                .map(move |msg| {
                    let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| {
                        Error::from(HawkError::InvalidServerMessage(
                            "The server has replied with an unknown message"
                        ))
                    })?;
                    // process message and update book...
                    Ok(Some(book.clone()))
                })
                .and_then(|x: Result<_, _>| {
                    if x.is_err() {
                        err(x.unwrap_err())
                    } else {
                        ok(x.unwrap())
                    }
                })
                .filter_map(|x| x);
            ok(stream) // return a stream of (references to) books
        })
}
#9

Ok, I’m sorry, I’m afraid I haven’t enough experience with futures 0.1 streams to find a solution here… I know it looks a lot cleaner in async await, but I suppose you have your reasons for sticking to 0.1.

#10

Anyway, thanks for the help!