I am writing an AWS Lambda that consumes S3 events, ETLs the matching files, and uploads the results to a Postgres database. I'm using tokio_postgres
from within the tokio lambda code, and I want to use the COPY IN functionality of Postgres to upload in efficient batches and I am feeding in a String that is in the Postgres Text format.
I'm having trouble with all of the async coding and figuring out exactly what I need to do. I think I need to wrap the bytes I'm sending in an Arc
as the data is actually sent from a different thread. But I'm presently getting the error:
error[E0277]: the trait bound `Arc<std::str::Bytes<'_>>: bytes::buf::buf_impl::Buf` is not satisfied
What's the correct way to do this?
// copy_in() uses Postgres's bulk COPY command to upload text directly into
// our DB table. Returns the number of rows created or an error. This
// owns and consumes the text String.
async fn copy_in(client: &mut tokio_postgres::Client, text: String) -> Result<u64, Box<dyn std::error::Error>> {
let q = format!("COPY ingestion.ritevents( {} ) FROM stdin", etllib::postgres_columns());
let sink = client.copy_in(q.as_str()).await?;
// The sink runs in a different thread, so we need to pin the data
// structure so that it cannot be moved.
pin_mut!(sink);
let data = text.bytes();
let safe_ref = Arc::new(data);
sink.as_mut().send(safe_ref.clone()).await?;
sink.as_mut().finish().await?;
eprintln!("I did it!");
Ok(1)
}
Without the Arc
I was getting errors about data
needing to have a static lifetime. But I've gotten to the place where I don't know the next step in figuring this out.
Suggestions?
Jack Neely