Streaming from Google firestore with gRPC?

Not sure if anyone here has used firestore with Rust... I would love to implement a firestore client but need streaming to work. Using GitHub - gkkachi/firestore-grpc I was able to make normal document list requests successfully. However, listen / stream requests don't really seem to work.

From other clients I was able to get an idea of how to assemble the protos:

Creating the request:

    let req = ListenRequest {
        database: format!("projects/{}/databases/(default)", project_id),
        labels: HashMap::new(),
        target_change: Some(TargetChange::AddTarget(Target {
            target_id: 0x52757374, // "Rust" in hex https://github.com/googleapis/python-firestore/issues/51
            once: false,
            target_type: Some(TargetType::Documents(DocumentsTarget {
                documents: vec![users_collection],
            })),
            resume_type: None,
        })),
    };

And sending it

    let mut req = Request::new(stream::iter(vec![req]));
    let metadata = req.metadata_mut();
    metadata.insert(
        "google-cloud-resource-prefix",
        MetadataValue::from_str(&db).unwrap(),
    );

    println!("sending request");
    let res = get_client(&token).await?.listen(req).await?;
    let mut res = res.into_inner();
    while let Some(msg) = res.next().await {
        println!("getting response");
        dbg!(msg);
    }

(full example in this repo)

The requests seems to go through successfully but I don't receive any streamed content, the request is closed immediately. All I'm getting from the debug lots is

[2021-10-27T14:54:39Z DEBUG h2::codec::framed_write] send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) } 
[2021-10-27T14:54:39Z DEBUG h2::proto::connection] Connection::poll; connection error error=GoAway(b"", NO_ERROR, Library) 

Any idea what might be wrong?

Thank you!

If I'm reading this correctly, your client library is telling the server to GO AWAY. You should be able to debug this by reading the library source around bidi streaming responses.

Actually, that's a good question - is Listen actually a bidirectional request or is it just server streaming?

1 Like

Yeah, thank you, you're right. I was holding it wrong. So the input stream is not expected to end, if this happens the connection is closed.

Quick change to get a response: Request::new(stream::iter(vec![req]).chain(stream::pending())).

I still need to figure out the proper way to set things up so that the input stream can be controlled but that was the crucial bit.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.