Scenario: App is a gRPC client that needs to initiate two different streaming connections to the same server.
If one of them goes bad - returns an error response - both need to be reset.
Let's say Stream 2 goes bad. The problem: If, on receipt of the next message in the Stream 1 loop (calling Streaming.message() ) I break out of the loop and re-init the stream (calling the method on the service that returns Streaming<_>
), the server returns the error, "Active streaming subscription already exists." It refuses to create another connection.
This occurs even if I call drop
on the Streaming
instance.
Thus I am at a loss of how to handle this. I can't kill the thread when it's blocking on message()
(because there's no way to kill threads). I can't re-init the stream because drop
apparently doesn't close it. What to do?
Without knowing anything about gRPC, just scanning the docs:
In general it may take some time for sockets to timeout, so you may have to keep retrying periodically for a while.
You may need to go all the way back to the connect
call, if you're not doing that already.
Just guessing.
Also: If you haven't already, check the server logs to see if there are errors during shutdown, etc.
There's no access to the server log - this is a SaaS product.
I should've mentioned there's a 10s sleep in the thread already. Making it longer is an option - but boy is this hacky.
I've used gRPC in other languages (mainly JVM), and there was a way to close a stream from the client. This would result in an immediate server interrupt. I am pretty sure this is simply a weakness in tonic as there seems to be no way to do it.
I think you're right. Is this the same issue you're seeing?:
opened 04:29PM - 29 Jan 23 UTC
## Bug Report
The connection does not know how to recover on its own in the eve… nt of a break. (Sometimes it can, most often the connection freezes after disconnecting)
### Version 0.8.3
```
│ └── tonic v0.8.3
│ └── tonic-build v0.8.4
├── tonic v0.8.3 (*)
│ └── tonic v0.8.3 (*)
│ └── tonic-build v0.8.4 (*)
```
### Platform
`Linux Home-PC 5.15.0-58-generic #64-Ubuntu SMP Thu Jan 5 11:43:13 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux`
### Crates
### Description
If the connection between the client and the server is lost. He can no longer restore it and the program that uses the client no longer works. For example, when you sleep for a long time or physically disconnect from the server.
Further use of the grpc service is impossible, just a timeout error or a broken connection.
```
[2023-01-28T15:11:55Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(19), flags: (0x4: END_HEADERS) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19), flags: (0x1: END_STREAM) }
[2023-01-28T15:12:01Z DEBUG hyper::proto::h2::server] stream error: connection error: broken pipe
[2023-01-28T15:12:01Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(19), error_code: CANCEL }
```
or this
```
[2023-01-28T15:23:35Z DEBUG client] send: interval(40)
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(25), error_code: CANCEL }
[2023-01-28T15:23:35Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(27), flags: (0x4: END_HEADERS) }
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27) }
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27), flags: (0x1: END_STREAM) }
[2023-01-28T15:23:36Z ERROR client] status: Cancelled, message: "Timeout expired", details: [], metadata: MetadataMap { headers: {} }
```
I encountered this problem on the working program in the finished product. To study it I created a simple server-client on from the tonic examples
And I get the same errors.
I run the server on a remote machine, that it would be possible to physically break the connection between the client and the server.
Server
```rust
struct Service {}
#[tonic::async_trait]
impl test_grpc::say_server::Say for Service {
async fn hello(&self, request: Request<RequestSay>) -> Result<Response<ResponseSay>, Status> {
let r = request.into_inner().text;
debug!("in request: {}", r);
Ok(Response::new(ResponseSay {
text: format!("hello {r}"),
}))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
env_logger::Builder::new()
.filter_level(log::LevelFilter::from_str("debug").unwrap())
.init();
let s = Service {};
let key = "secret token";
let svc = test_grpc::say_server::SayServer::with_interceptor(
s,
move |req: Request<()>| -> Result<Request<()>, Status> {
let token: MetadataValue<_> = key.parse().unwrap();
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
},
);
let addr = "0.0.0.0:8804".parse::<SocketAddr>().unwrap();
Server::builder()
.add_service(svc)
.serve(addr)
.await
.unwrap();
Ok(())
}
```
Client
```rust
async fn tester_client(sleep: Duration, uri: &str, key: &str) {
let uri = uri.parse().unwrap();
debug!("create connect");
let chan = tonic::transport::Channel::builder(uri)
.timeout(Duration::from_secs(20))
.connect_timeout(Duration::from_secs(20))
//.http2_keep_alive_interval(Duration::from_secs(5))
//.keep_alive_while_idle(true)
.connect_lazy();
let key = key.parse::<tonic::metadata::MetadataValue<_>>().unwrap();
let mut key = Some(key);
let mut service = test_grpc::say_client::SayClient::with_interceptor(
chan,
move |mut req: tonic::Request<()>| {
if let Some(secret) = &mut key {
req.metadata_mut().insert("authorization", secret.clone());
}
Ok(req)
},
);
loop {
let send_text = format!("interval({})", sleep.as_secs_f32() / 60.0);
debug!("send: {send_text}");
let res = match service
.hello(tonic::Request::new(test_grpc::RequestSay {
text: send_text.clone(),
}))
.await
{
Ok(r) => r,
Err(e) => {
error!("{e:#}");
continue;
}
};
debug!("recv: {}", res.into_inner().text);
time::sleep(sleep).await;
println!();
}
}
```
I have tried several settings. For example, if use .http2_keep_alive_interval(Duration::from_secs(5)) then the connection does not break during idle time. But if you physically break the connection, then it can no longer be restored (Sometimes the tonic reconnects itself, but most often the connection just hangs).
Perhaps I need to specify some other settings so that a new connection is established when it breaks?
In the past we've implemented heartbeats to work around this problem (for sockets in general), and I see a heartbeat issue referenced from the above issue:
risingwavelabs:main
← risingwavelabs:dylan/add_heartbeat_between_frontend_and_compute
opened 03:58AM - 29 Mar 24 UTC
I hereby agree to the terms of the [RisingWave Labs, Inc. Contributor License Ag… reement](https://gist.github.com/TennyZhuang/f00be7f16996ea48effb049aa7be4d66#file-rw_cla).
## What's changed and what's your intention?
- Resolve https://github.com/risingwavelabs/risingwave/issues/14569
- Related issue https://github.com/risingwavelabs/risingwave/issues/14576 https://github.com/risingwavelabs/risingwave/issues/14030 https://github.com/risingwavelabs/risingwave/issues/14217
- Add heartbeat between frontend and compute. If some compute nodes are unreachable, mask it unavailable for seconds, so that batch queries can still work.
## Checklist
- [ ] I have written necessary rustdoc comments
- [ ] I have added necessary unit tests and integration tests
- [ ] I have added test labels as necessary. See [details](https://github.com/risingwavelabs/risingwave/blob/main/docs/developer-guide.md#ci-labels-guide).
- [ ] I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features #7934).
- [ ] My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
- [ ] All checks passed in `./risedev check` (or alias, `./risedev c`)
- [ ] My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
- [ ] My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the [details](https://github.com/risingwavelabs/risingwave/blob/main/CONTRIBUTING.md))
## Documentation
- [ ] My PR needs documentation updates. (Please use the **Release note** section below to summarize the impact on users)
## Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
</details>
No, not really. I mean there is no way to intentionally close a server-side streaming connection from the client.
However, waiting long enough makes it work.