Hello Rust friends,
I'm trying to write a program that sends lines of texts down a data processing pipeline, going from RDS Database logs to Elasticsearch. The way Rusoto's download_db_log_file_portion is that you have to continually run it until it returns with no data. Here's a method that downloads the while thing:
pub async fn download_full_db_log_file(rds_client: &RdsClient,
db_instance_identifier: &str,
log_file_name: &str
) -> String {
let mut buffer = String::new();
let mut marker: Option<String> = None;
loop {
let download_message = DownloadDBLogFilePortionMessage {
db_instance_identifier: String::from(db_instance_identifier),
log_file_name: String::from(log_file_name),
marker: marker,
number_of_lines: None
};
let result = rds_client.download_db_log_file_portion(download_message).await.unwrap();
buffer.push_str(result.log_file_data.unwrap().as_str());
if result.additional_data_pending.unwrap() {
marker = result.marker;
} else {
break;
}
}
buffer
}
However if the log file is huge, this will use a ton of memory.
All I really want is to be able to stream each individual line in the file out for downstream processing. My first attempt was to try and use async-stream (https://github.com/tokio-rs/async-stream) to yield each "page", but I ran into some recursion limit issue - I think the fact that I'm polling two kinds of things, one the AWS API pages and two the actual future, is throwing it for a loop.
Next I tried implementing Stream myself, but I don't see how I can store the current Future that I'm polling:
impl Stream for RdsLogsStreamer<'_> {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) ->
Poll<Option<Self::Item>>
{
let download_message = DownloadDBLogFilePortionMessage {
db_instance_identifier: String::from(self.db_instance_identifier),
log_file_name: String::from(self.log_file_name),
marker: self.marker.clone(),
number_of_lines: None
};
let result = self.rds_client
.download_db_log_file_portion(download_message)
.as_mut()
.poll(cx);
// The previous 4 lines, while it compiles, doesn't work properly. It will be creating
// a new Future each time it is called, instead of waiting for the "current" Future
// to be done. Since the Future's Item refers to some private objects, I can't store
// it in my own Struct. (or can I? am I missing something?)
So I'm at a bit of wall here. Does anyone have any suggestions? I get the feeling I'm overthinking my approach to this. Thank you!