Guys, the below is code from google's gcp_bigquery_client. What I cannot grasp is this:
In its current form this program is only working for tables that have exactly four columns and the columns are named:
actor_id, first_name, last_name, last_update.
Obviously this is ridiculously constrained. How to make it so it works on tables with different number of columns?
#[cfg(test)]
pub mod test {
use crate::model::dataset::Dataset;
use crate::model::field_type::FieldType;
use crate::model::table::Table;
use crate::model::table_field_schema::TableFieldSchema;
use crate::model::table_schema::TableSchema;
use crate::storage::{ColumnType, FieldDescriptor, StreamName, TableDescriptor};
use crate::{env_vars, Client};
use prost::Message;
use std::time::{Duration, SystemTime};
use tokio_stream::StreamExt;
#[tokio::test]
async fn test() -> Result<(), Box<dyn std::error::Error>> {
let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
let dataset_id = &format!("{dataset_id}_storage");
let mut client = Client::from_service_account_key_file(sa_key).await?;
// Delete the dataset if needed
client.dataset().delete_if_exists(project_id, dataset_id, true).await;
// Create dataset
let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
// Create table
let table = Table::new(
project_id,
dataset_id,
table_id,
TableSchema::new(vec![
TableFieldSchema::new("actor_id", FieldType::Int64),
TableFieldSchema::new("first_name", FieldType::String),
TableFieldSchema::new("last_name", FieldType::String),
TableFieldSchema::new("last_update", FieldType::Timestamp),
]),
);
let created_table = client
.table()
.create(
table
.description("A table used for unit tests")
.label("owner", "me")
.label("env", "prod")
.expiration_time(SystemTime::now() + Duration::from_secs(3600)),
)
.await?;
assert_eq!(created_table.table_reference.table_id, table_id.to_string());
// let (ref project_id, ref dataset_id, ref table_id, ref gcp_sa_key) = env_vars();
//
// let mut client = crate::Client::from_service_account_key_file(gcp_sa_key).await?;
let field_descriptors = vec![
FieldDescriptor {
name: "actor_id".to_string(),
number: 1,
typ: ColumnType::Int64,
},
FieldDescriptor {
name: "first_name".to_string(),
number: 2,
typ: ColumnType::String,
},
FieldDescriptor {
name: "last_name".to_string(),
number: 3,
typ: ColumnType::String,
},
FieldDescriptor {
name: "last_update".to_string(),
number: 4,
typ: ColumnType::Timestamp,
},
];
let table_descriptor = TableDescriptor { field_descriptors };
#[derive(Clone, PartialEq, Message)]
struct Actor {
#[prost(int32, tag = "1")]
actor_id: i32,
#[prost(string, tag = "2")]
first_name: String,
#[prost(string, tag = "3")]
last_name: String,
#[prost(string, tag = "4")]
last_update: String,
}
let actor1 = Actor {
actor_id: 1,
first_name: "John".to_string(),
last_name: "Doe".to_string(),
last_update: "2007-02-15 09:34:33 UTC".to_string(),
};
let actor2 = Actor {
actor_id: 2,
first_name: "Jane".to_string(),
last_name: "Doe".to_string(),
last_update: "2008-02-15 09:34:33 UTC".to_string(),
};
let stream_name = StreamName::new_default(project_id.clone(), dataset_id.clone(), table_id.clone());
let trace_id = "test_client".to_string();
let mut streaming = client
.storage_mut()
.append_rows(&stream_name, &table_descriptor, &[actor1, actor2], trace_id)
.await?;
while let Some(resp) = streaming.next().await {
let resp = resp?;
println!("response: {resp:#?}");
}
Ok(())
}
}