AWS Lambda and Tokio Postgres

I am writing an AWS Lambda that consumes S3 events, ETLs the matching files, and uploads the results to a Postgres database. I'm using tokio_postgres from within the tokio lambda code, and I want to use the COPY IN functionality of Postgres to upload in efficient batches and I am feeding in a String that is in the Postgres Text format.

I'm having trouble with all of the async coding and figuring out exactly what I need to do. I think I need to wrap the bytes I'm sending in an Arc as the data is actually sent from a different thread. But I'm presently getting the error:

error[E0277]: the trait bound `Arc<std::str::Bytes<'_>>: bytes::buf::buf_impl::Buf` is not satisfied

What's the correct way to do this?

// copy_in() uses Postgres's bulk COPY command to upload text directly into
// our DB table.  Returns the number of rows created or an error.  This
// owns and consumes the text String.
async fn copy_in(client: &mut tokio_postgres::Client, text: String) -> Result<u64, Box<dyn std::error::Error>> {
    let q = format!("COPY ingestion.ritevents( {} ) FROM stdin", etllib::postgres_columns());
    let sink = client.copy_in(q.as_str()).await?;

    // The sink runs in a different thread, so we need to pin the data
    // structure so that it cannot be moved.
    pin_mut!(sink);
    let data = text.bytes();
    let safe_ref = Arc::new(data);

    sink.as_mut().send(safe_ref.clone()).await?;
    sink.as_mut().finish().await?;
    eprintln!("I did it!");
    Ok(1)
}

Without the Arc I was getting errors about data needing to have a static lifetime. But I've gotten to the place where I don't know the next step in figuring this out.

Suggestions?
Jack Neely

So, the client.copy_in() returns a Sink<U>, where U must implement trait Buf

pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>where
    T: ?Sized + ToStatement,
    U: Buf + 'static + Send,

The 'static issue you ran into initially means you ought to pass ownership of something into the send() method. So, you don't need an Arc because you can pass exclusive ownership of the data.

Change it to be like this:

    pin_mut!(sink);
    let data: Bytes = bytes::Bytes::from(text);  // <--- create a Bytes

    sink.as_mut().send(data).await?;   // <--- pass ownership
    sink.as_mut().finish().await?;
    eprintln!("I did it!");
    Ok(1)
1 Like

Thanks for the help! It was figuring out the buffer stuff that you nailed that I was having trouble figuring out. Your code suggestions do work.

Does the bytes::Bytes::from(text) initializer copy the underlying string or use a reference to it? I'm definitely trying to avoid unnecessary copying of the data to keep my lambda footprint small.

Thankfully the Bytes and String types afford this conversion without copying the contents!

Here's the relevant code


impl From<String> for Bytes {
    fn from(s: String) -> Bytes {
        Bytes::from(s.into_bytes())
    }
}

impl From<Bytes> for Vec<u8> {
    fn from(bytes: Bytes) -> Vec<u8> {
        let bytes = mem::ManuallyDrop::new(bytes);
        unsafe { (bytes.vtable.to_vec)(&bytes.data, bytes.ptr, bytes.len) }
    }
}

from Bytes bytes.rs - source

And Rust's String::into_bytes() which is documented as avoiding a copy:

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.