Google gcp client

Guys, the below is code from google's gcp_bigquery_client. What I cannot grasp is this:
In its current form this program is only working for tables that have exactly four columns and the columns are named:
actor_id, first_name, last_name, last_update.
Obviously this is ridiculously constrained. How to make it so it works on tables with different number of columns?

#[cfg(test)]
pub mod test {
    use crate::model::dataset::Dataset;
    use crate::model::field_type::FieldType;
    use crate::model::table::Table;
    use crate::model::table_field_schema::TableFieldSchema;
    use crate::model::table_schema::TableSchema;
    use crate::storage::{ColumnType, FieldDescriptor, StreamName, TableDescriptor};
    use crate::{env_vars, Client};
    use prost::Message;
    use std::time::{Duration, SystemTime};
    use tokio_stream::StreamExt;

    #[tokio::test]
    async fn test() -> Result<(), Box<dyn std::error::Error>> {
        let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
        let dataset_id = &format!("{dataset_id}_storage");

        let mut client = Client::from_service_account_key_file(sa_key).await?;

        // Delete the dataset if needed
        client.dataset().delete_if_exists(project_id, dataset_id, true).await;

        // Create dataset
        let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
        assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));

        // Create table
        let table = Table::new(
            project_id,
            dataset_id,
            table_id,
            TableSchema::new(vec![
                TableFieldSchema::new("actor_id", FieldType::Int64),
                TableFieldSchema::new("first_name", FieldType::String),
                TableFieldSchema::new("last_name", FieldType::String),
                TableFieldSchema::new("last_update", FieldType::Timestamp),
            ]),
        );
        let created_table = client
            .table()
            .create(
                table
                    .description("A table used for unit tests")
                    .label("owner", "me")
                    .label("env", "prod")
                    .expiration_time(SystemTime::now() + Duration::from_secs(3600)),
            )
            .await?;
        assert_eq!(created_table.table_reference.table_id, table_id.to_string());

        // let (ref project_id, ref dataset_id, ref table_id, ref gcp_sa_key) = env_vars();
        //
        // let mut client = crate::Client::from_service_account_key_file(gcp_sa_key).await?;

        let field_descriptors = vec![
            FieldDescriptor {
                name: "actor_id".to_string(),
                number: 1,
                typ: ColumnType::Int64,
            },
            FieldDescriptor {
                name: "first_name".to_string(),
                number: 2,
                typ: ColumnType::String,
            },
            FieldDescriptor {
                name: "last_name".to_string(),
                number: 3,
                typ: ColumnType::String,
            },
            FieldDescriptor {
                name: "last_update".to_string(),
                number: 4,
                typ: ColumnType::Timestamp,
            },
        ];
        let table_descriptor = TableDescriptor { field_descriptors };

        #[derive(Clone, PartialEq, Message)]
        struct Actor {
            #[prost(int32, tag = "1")]
            actor_id: i32,

            #[prost(string, tag = "2")]
            first_name: String,

            #[prost(string, tag = "3")]
            last_name: String,

            #[prost(string, tag = "4")]
            last_update: String,
        }

        let actor1 = Actor {
            actor_id: 1,
            first_name: "John".to_string(),
            last_name: "Doe".to_string(),
            last_update: "2007-02-15 09:34:33 UTC".to_string(),
        };

        let actor2 = Actor {
            actor_id: 2,
            first_name: "Jane".to_string(),
            last_name: "Doe".to_string(),
            last_update: "2008-02-15 09:34:33 UTC".to_string(),
        };

        let stream_name = StreamName::new_default(project_id.clone(), dataset_id.clone(), table_id.clone());
        let trace_id = "test_client".to_string();

        let mut streaming = client
            .storage_mut()
            .append_rows(&stream_name, &table_descriptor, &[actor1, actor2], trace_id)
            .await?;

        while let Some(resp) = streaming.next().await {
            let resp = resp?;
            println!("response: {resp:#?}");
        }

        Ok(())
    }
}

The field_descriptors variable is an array. You can construct it dynamically at run-time based on user-provided input, for example.

That then leaves the problem of how to dynamically build row objects (actor1 and actor2 in this case). Since these objects must implement the prost::Message trait, you could try something like the prost-reflect crate.

I haven't tried it, but you could probably build compliant data objects by deserializing a JSON-encoded input into a DynamicMessage, which satisfies the trait bound.

Note that reflection and this kind of dynamic typing in Rust, while possible, is not really treated as a first-class use case. Rust emphasizes machine performance and type safety; if this comes across as being "ridiculously constrained", perhaps a language like Python or Java would be more suitable for your needs.

Thank you for your answer. Ridiculously constrained, you've got the wrong end of the stick. I meant google's api not rust.
As for: "perhaps a language like Python or Java would be more suitable for your needs."
It is similar to: "Contact your compiler vendor and ask them to implement the feature you are missing".

I see. Well, I'm not that familiar with BigQuery, but I believe its tables do need to have a schema defined, much like a relational SQL database. There are also more dynamic NoSQL databases if you need that kind of flexibility.

Shal I organize meeting in the corpo I work for and tell them that?

Rust and SQL don't care about your workplace politics I'm afraid. They are tools with strengths and weaknesses, pick the right tools for the job.

Shal I organize meeting in the corpo I work for and tell them that?

Actually it doesn't:

^^^ the trait prost::Message is not implemented for DynamicMessage

It is implemented according to the docs.

I'm not sure where you're getting this message from.

Edit: It's possible you need to enable certain features, e.g. serde or prost-reflect-derive

https://docs.rs/crate/prost-reflect/0.14.2/features

1 Like

I have those features enabled. Unfortunately I think the docs are incorrect.

Edit.
Moment, I can see(in the docs) that the trait is implemented. I don't understand.

In the source code it is implemented too. Try to get the JSON decoding example working, and the resulting DynamicMessage should be usable in your original code.

use prost::Message;
use prost_reflect::{DynamicMessage, DescriptorPool, Value};
use serde_json::de::Deserializer;

let pool = DescriptorPool::decode(include_bytes!("file_descriptor_set.bin").as_ref()).unwrap();
let message_descriptor = pool.get_message_by_name("package.MyMessage").unwrap();

let json = r#"{ "foo": 150 }"#;
let mut deserializer = Deserializer::from_str(json);
let dynamic_message = DynamicMessage::deserialize(message_descriptor, &mut deserializer).unwrap();
deserializer.end().unwrap();

The file_descriptor_set.bin file is in the prost-reflect/src folder if you need it.

1 Like

Hi, thanks for the reply. I actually got the code example to compile. Unfortunately it still says that the message trait is not implemented for the DynamicMessage:

use gcp_bigquery_client::storage::{FieldDescriptor, StreamName, TableDescriptor};
use prost::Message;
use prost_reflect::{DynamicMessage, DescriptorPool, Value};
use serde_json::de::Deserializer;
//protoc -I . --descriptor_set_out=descriptor-set.bin foo.proto
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {


    let pool = DescriptorPool::decode(include_bytes!("file_descriptor_set.bin").as_ref()).unwrap();
    let message_descriptor = pool.get_message_by_name("dynamic_message.items.Author").unwrap();

    let json = r#"{ "name": "tod" }"#;
    let mut deserializer = Deserializer::from_str(json);
    let dynamic_message:DynamicMessage = DynamicMessage::deserialize(message_descriptor, &mut deserializer).unwrap();
    deserializer.end().unwrap();

    //assert_eq!(dynamic_message.get_field_by_name("foo").unwrap().as_ref(), &Value::I32(150));
    assert_eq!(dynamic_message.get_field_by_name("name").unwrap().as_ref(), &Value::String("tod".to_owned()));
    let gcp_sa_key = "key";
    let project_id = "args.project";
    let dataset_id = "args.dataset";
    let table_id = "args.table";
    let mut client = gcp_bigquery_client::Client::from_service_account_key_file(gcp_sa_key).await?;
    let table_struct_Table = client
        .table()
        .get(
            &project_id,
            &dataset_id,
            &table_id,
            None,
        )
        .await
        .unwrap();
    let stream_name = StreamName::new_default(project_id.to_owned().clone(), dataset_id.to_owned().clone(), table_id.to_owned().clone());

    let trace_id = "test_client".to_string();
    let mut field_descriptors:Vec<FieldDescriptor> = Vec::new();
    let table_descriptor = TableDescriptor { field_descriptors };
    let vs = vec![dynamic_message];
    let mut streaming = client
        .storage_mut()

        .append_rows(&stream_name, &table_descriptor, &vs, trace_id.clone())
        .await?;
    println!("Hello, world!");
    Ok(())
}

OK, that was caused by incompatible versions of crates. The one that gcp client had and the one I was importing in my cargo.toml. Tbh, the message was completely misleading. I believe that there is work needed to be done on that front.

1 Like

Thank you for your help.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.