I'd like to return early from a lambda in a Stream.for_each.
Essentially I'm trying to find a more idiomatic way to express the following:
let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
});
if msg.is_err() {
return err(msg.unwrap_err());
}
let msg = msg.unwrap();
I tried using the ? operator on the Result, but it would return a Result::Err instead of an err:
the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
cannot use the `?` operator in a function that returns `websocket::futures::Failed<_, failure::Error>`
the trait `std::ops::Try` is not implemented for `websocket::futures::Failed<_, failure::Error>`
Is there a better way to return an err early from a Stream?
let msg: WsMsg = serde_json::from_str(&msg).map_err(|_|
{
Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
})?;
The lambda's return type is websocket::futures::Failed<std::option::Option<std::sync::Arc<std::sync::RwLock<Book>>>, failure::Error>.
I'd like the stream to produce an error, when deserialization fails (i.e. return a future that will immediately return an error).
The context of the code looks like this:
stream.from_err().and_then(move |msg| {
let msg: Result<WsMsg, HawkError> = serde_json::from_str(&msg).map_err(|_| {
Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
});
if msg.is_err() {
return err(msg.unwrap_err());
}
let msg = msg.unwrap();
// update book with msg...
ok(Some(book))
})
.filter_map(|x| x); // filter the cases where nothing is produced (with ok(None))
stream.from_err()
.map(move |msg| {
let msg: WsMsg = serde_json::from_str(&msg).map_err(|_| {
Error::from(HawkError::InvalidServerMessage("The server has replied with an unknown message"))
})?;
// update book with msg...
ok(Some(book))
})
.and_then(|x: Result<_, _>| {
if x.is_err() {
err(x.unwrap_err())
} else {
ok(x.unwrap())
}
})
.filter_map(|x| x);
Note that I changed the first and_then to a map. It now returns a Result<Option<Arc<RwLock<Book>>>, Error> instead of a Future<Option<Arc<RwLock<Book>>>, Error>.
However, I feel like there should be some sweeter way to achieve the same!
Ok, I'm sorry, I'm afraid I haven't enough experience with futures 0.1 streams to find a solution here... I know it looks a lot cleaner in async await, but I suppose you have your reasons for sticking to 0.1.