Async Streaming Lines out of a Rusoto_Rds download_db_log_file_portion

Hello Rust friends,

I'm trying to write a program that sends lines of texts down a data processing pipeline, going from RDS Database logs to Elasticsearch. The way Rusoto's download_db_log_file_portion is that you have to continually run it until it returns with no data. Here's a method that downloads the while thing:

pub async fn download_full_db_log_file(rds_client: &RdsClient,
    db_instance_identifier: &str,
    log_file_name: &str
) -> String {
    let mut buffer = String::new();
    let mut marker: Option<String> = None;

    loop {
        let download_message = DownloadDBLogFilePortionMessage {
            db_instance_identifier: String::from(db_instance_identifier),
            log_file_name: String::from(log_file_name),
            marker: marker,
            number_of_lines: None
        };

        let result = rds_client.download_db_log_file_portion(download_message).await.unwrap();

        buffer.push_str(result.log_file_data.unwrap().as_str());

        if result.additional_data_pending.unwrap() {
            marker = result.marker;
        } else {
            break;
        }
    }

    buffer
}

However if the log file is huge, this will use a ton of memory.

All I really want is to be able to stream each individual line in the file out for downstream processing. My first attempt was to try and use async-stream (https://github.com/tokio-rs/async-stream) to yield each "page", but I ran into some recursion limit issue - I think the fact that I'm polling two kinds of things, one the AWS API pages and two the actual future, is throwing it for a loop.

Next I tried implementing Stream myself, but I don't see how I can store the current Future that I'm polling:

impl Stream for RdsLogsStreamer<'_> {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) ->
        Poll<Option<Self::Item>>
    {
        let download_message = DownloadDBLogFilePortionMessage {
            db_instance_identifier: String::from(self.db_instance_identifier),
            log_file_name: String::from(self.log_file_name),
            marker: self.marker.clone(),
            number_of_lines: None
        };

        let result = self.rds_client
            .download_db_log_file_portion(download_message)
            .as_mut()
            .poll(cx);
       // The previous 4 lines, while it compiles, doesn't work properly. It will be creating
       // a new Future each time it is called, instead of waiting for the "current" Future
       // to be done. Since the Future's Item refers to some private objects, I can't store
       // it in my own Struct. (or can I? am I missing something?)

So I'm at a bit of wall here. Does anyone have any suggestions? I get the feeling I'm overthinking my approach to this. Thank you!

Here's a complete example of one of the ways I hoped would work:

use rusoto_rds::{RdsClient, Rds, DownloadDBLogFilePortionMessage};
use std::future::Future;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

struct PaginateDbLogFile<'a, T> {
    buffer: String,
    rds_client: &'a RdsClient,
    marker: Option<String>,
    future: Option<Pin<Box<dyn Future<Output=T>>>>
}

impl<T> Stream for PaginateDbLogFile<'_, T> {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
    {
        let result = Vec::new();

        if self.future.is_none() {
            // If there is no future, make a new one. This should happen on first
            // call, but later self.future will be mutated each time AWS signals that there's
            // a new page of data available, after we are finished polling the current page

            let result = self.rds_client.download_db_log_file_portion(DownloadDBLogFilePortionMessage{
                db_instance_identifier: String::from("mydatabase"),       
                log_file_name: String::from("mylogfile.1"),       
                marker: None,
                number_of_lines: None      
            });

            self.future = Some(Box::pin(result)); // <---- Line that errors with type error
            // Note that I added Box::pin here at the compilers suggestion, but it doesn't
            // work with or without it
        }

        Poll::Ready(None)
    }
}

#[tokio::main]
async fn main() {
    println!("doing nothing yet");

    let client = RdsClient::new(rusoto_core::region::Region::UsWest2);
}

But this won't let me compile, giving me:

error[E0271]: type mismatch resolving `<std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<rusoto_rds::generated::DownloadDBLogFilePortionDetails, rusoto_core::error::RusotoError<rusoto_rds::generated::DownloadDBLogFilePortionError>>> + std::marker::Send>> as core::future::future::Future>::Output == T`
  --> src/main.rs:33:32
   |
14 | impl<T> Stream for PaginateDbLogFile<'_, T> {
   |      - this type parameter
...
33 |             self.future = Some(Box::pin(result)); // <---- Line that errors with type error
   |                                ^^^^^^^^^^^^^^^^ expected type parameter `T`, found enum `std::result::Result`
   |
   = note: expected type parameter `T`
                        found enum `std::result::Result<rusoto_rds::generated::DownloadDBLogFilePortionDetails, rusoto_core::error::RusotoError<rusoto_rds::generated::DownloadDBLogFilePortionError>>`
   = help: type parameters must be constrained to match other types
   = note: for more information, visit https://doc.rust-lang.org/book/ch10-02-traits.html#traits-as-parameters
   = note: required for the cast to the object type `dyn core::future::future::Future<Output = T>`

I'm not entirely sure what it wants me to do or if storing a pinned Future in a struct is even possible, so any guidance would be great. Thanks!

It's definitely possible to store a pinned future, but the issue is that the return type doesn't match. It returns a

Result<rusoto_rds::generated::DownloadDBLogFilePortionDetails, rusoto_core::error::RusotoError<rusoto_rds::generated::DownloadDBLogFilePortionError>>

but you said the future returns a generic parameter T, chosen by the caller.

Alright, I'm not super sure why I can't use generics here, but I did change to concrete types by guessing at what rusoto_rds re-exports those private types as. Now I'm getting lifetime issues:

use core::future::Future;
use futures::stream::Stream;
use rusoto_rds::{
    RdsClient, DescribeDBLogFilesMessage, DescribeDBLogFilesDetails, DownloadDBLogFilePortionDetails, DownloadDBLogFilePortionMessage, Rds,
    DownloadDBLogFilePortionError
};
use rusoto_core::RusotoError;
use std::pin::Pin;
use core::task::{Context, Poll};

type DownloadDbLogFilePortionFuture = dyn Future<Output = Result<DownloadDBLogFilePortionDetails, RusotoError<DownloadDBLogFilePortionError>>>;

pub struct RdsLogsStreamer<'a> {
    rds_client: &'a RdsClient,
    db_instance_identifier: &'a str,
    log_file_name: &'a str,
    marker: Option<String>,
    future: Option<Pin<Box<DownloadDbLogFilePortionFuture>>>,
    additional_data_pending: bool
}

impl<'a> RdsLogsStreamer<'a> {
    pub fn new(rds_client: &'a RdsClient, db_instance_identifier: &'a str, log_file_name: &'a str) -> RdsLogsStreamer<'a> {
        RdsLogsStreamer {
            rds_client: rds_client,
            db_instance_identifier: db_instance_identifier,
            log_file_name: log_file_name,
            marker: None,
            future: None,
            additional_data_pending: true
        }
    }
}

impl<'a> Stream for RdsLogsStreamer<'a> {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
    {
        let download_message = DownloadDBLogFilePortionMessage {
            db_instance_identifier: String::from(self.db_instance_identifier),
            log_file_name: String::from(self.log_file_name),
            marker: self.marker.clone(),
            number_of_lines: None
        };

        if !self.additional_data_pending {
            return Poll::Ready(None);
        }

        if self.future.is_none() {
            self.future = Some(self.rds_client.download_db_log_file_portion(download_message)); // <--- this line errors
        }

        match &mut self.future {
            Some(ref mut future) => {
                match future.as_mut().poll(cx) {
                    Poll::Ready(data) => {
                        let data = data.unwrap();

                        self.marker = data.marker;
                        self.additional_data_pending = data.additional_data_pending.unwrap();

                        Poll::Ready(Some(data.log_file_data.unwrap()))
                    },
                    Poll::Pending => Poll::Pending
                }
            },
            None => Poll::Pending
        }
    }
}
error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
   --> src/awshelpers.rs:123:48
    |
123 |             self.future = Some(self.rds_client.download_db_log_file_portion(download_message));
    |                                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
note: first, the lifetime cannot outlive the lifetime `'a` as defined on the impl at 106:6...
   --> src/awshelpers.rs:106:6
    |
106 | impl<'a> Stream for RdsLogsStreamer<'a> {
    |      ^^
note: ...so that reference does not outlive borrowed content
   --> src/awshelpers.rs:123:32
    |
123 |             self.future = Some(self.rds_client.download_db_log_file_portion(download_message));
    |                                ^^^^^^^^^^^^^^^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
   --> src/awshelpers.rs:123:27
    |
123 |             self.future = Some(self.rds_client.download_db_log_file_portion(download_message));
    |                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: expected  `std::option::Option<std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<rusoto_rds::generated::DownloadDBLogFilePortionDetails, rusoto_core::error::RusotoError<rusoto_rds::generated::DownloadDBLogFilePortionError>>> + 'static)>>>`
               found  `std::option::Option<std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<rusoto_rds::generated::DownloadDBLogFilePortionDetails, rusoto_core::error::RusotoError<rusoto_rds::generated::DownloadDBLogFilePortionError>>>>>>`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0495`.

I think it has to do with the definition of the function, though I don't know what part of this is 'static:

fn download_db_log_file_portion<'life0, 'async_trait>(
    &'life0 self,
    input: DownloadDBLogFilePortionMessage
) -> Pin<Box<dyn Future<Output = Result<DownloadDBLogFilePortionDetails, RusotoError<DownloadDBLogFilePortionError>>> + Send + 'async_trait>> where
    'life0: 'async_trait,
    Self: 'async_trait, 

I think I'm going to hold off on trying to get this to work using Streams unless there's something obvious I'm missing, as it doesn't really seem like what I'm trying to do is supported or documented yet. At any rate its kind of close, if that helps anyone in the future.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.