TENX
July 1, 2022, 11:05am
1
Hi,
I'm trying to send a dynamic gRPC message by using rust-protobuf and tonic . Here's my attempt . It's almost there, but with an odd error.
To use it, start the server first, then the client.
And we will see the following output:
request: {"name": "World"}
reply: {"message": "Hello World!"}
Err(
Status {
code: Internal,
message: "protocol error: received message with invalid compression flag: 10 (valid flags are 0 and 1) while receiving response with status: 200 OK",
source: None,
},
)
It says "invalid compression flag". Alright, let's get the packet:
and... I'm lost here.
TENX
July 1, 2022, 12:05pm
2
I've been debugging it for a while. It seems like the Err is returned here:
/// # }
/// ```
pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {
// Shortcut to see if we already pulled the trailers in the stream step
// we need to do that so that the stream can error on trailing grpc-status
if let Some(trailers) = self.trailers.take() {
return Ok(Some(trailers));
}
// To fetch the trailers we must clear the body and drop it.
while self.message().await?.is_some() {}
// Since we call poll_trailers internally on poll_next we need to
// check if it got cached again.
if let Some(trailers) = self.trailers.take() {
return Ok(Some(trailers));
}
// Trailers were not caught during poll_next and thus lets poll for
// them manually.
let map = future::poll_fn(|cx| Pin::new(&mut self.body).poll_trailers(cx))
it will jump to:
/// # where T: Debug,
/// # D: Decoder<Item = T, Error = Status> + Send + 'static,
/// # {
/// if let Some(next_message) = request.message().await? {
/// println!("{:?}", next_message);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn message(&mut self) -> Result<Option<T>, Status> {
match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await {
Some(Ok(m)) => Ok(Some(m)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
}
/// Fetch the trailing metadata.
///
/// This will drain the stream of all its messages to receive the trailing
/// metadata. If [`Streaming::message`] returns `None` then this function
then:
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let State::Error = &self.state {
return Poll::Ready(None);
}
// FIXME: implement the ability to poll trailers when we _know_ that
// the consumer of this stream will only poll for the first message.
// This means we skip the poll_trailers step.
if let Some(item) = self.decode_chunk()? {
return Poll::Ready(Some(Ok(item)));
}
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
let status = Status::from_error(err);
and we got the "invalid compression flag" here:
///
/// let mut buf = &b"\x08 hello"[..];
/// assert_eq!(8, buf.get_u8());
/// ```
///
/// # Panics
///
/// This function panics if there is no more remaining data in `self`.
fn get_u8(&mut self) -> u8 {
assert!(self.remaining() >= 1);
let ret = self.chunk()[0];
self.advance(1);
ret
}
/// Gets a signed 8 bit integer from `self`.
///
/// The current position is advanced by 1.
///
/// # Examples
///
It seems like the error arises when tonic try to fetch the trailers and clear the body. But it's not the correct body?
TENX
July 2, 2022, 3:23am
3
Problem solved. In case if anyone interested in, here's the context .
2 Likes
system
Closed
September 30, 2022, 3:24am
4
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.