How do I manipulate the result of a sqlx fetch which returns a pinned, boxed Stream?

I am trying to fetch data from my database using sqlx and I can't figure out what to do with the result of the .fetch() streaming method. If I try fetch_all things become easy, but that's not why I'm here.

Here is the documentation of the method in question: Executor in sqlx - Rust

I have a rough idea what's going on (by reading the documentation of Pin and Box), but I have no idea on how to proceed. I want to return this result from my fetcher function to other functions which will do the actual data decoding/processing, but the compiler really doesn't seem happy with this idea. And I have no clue how to interpret the various errors that I encounter by trying different things.

Could someone please explain me what's going on here and how can I actually use this result without iterating it in the immediate function that calls the fetch() method?

A Stream is the asynchronous version of an Iterator. You can manipulate it using the StreamExt trait from futures_util, which gives you (async versions of) the common iterator methods. (You can ignore the specifics of Pin<Box< when using the StreamExt trait).

I am not sure what you mean by "use the result immediately without iterating", perhaps you can clarify?

The function in question returns a stream where each item is a Result<Row, _> - the Result is there in case an error happens while you are processing the rows (for instance if the database gets disconnected).

I think you misread what I wrote. I know how to iterate the result using StreamExt in the immediate function, right after calling .fetch. What I do not know is how to return the Stream from the function so that I can use it somewhere else in the program.

You should be able to use the return type -> impl Stream<Item = YourItemType>

1 Like

This fails because of lifetimes:

36 |     conn: &Pool<Sqlite>,
   |     ^^^^  ------------- this data with an anonymous lifetime `'_`...
   |     |
   |     ...is captured here...
38 | ) -> impl Stream<Item = sqlx::Result<MyType>> {
   |      ------------------------------------- ...and is required to live as long as `'static` here

You could try returning

futures_util::stream::LocalBoxStream<'_, Result<Row, /* ... */>>

which is roughly equivalent to the signature of fetch

You should be able to tie them together with a lifetime. Use &'a Pool and then impl Stream<Item = ...> +'a

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.