Mongodb cursor and response streaming with Actix-Web

I'm really new to rust (like this is my first rust project new) coming from a nodejs background. I have this project where I want to return a very large response body from a mongodb aggregation pipeline as a response stream to the client.

The documentation for the cursor says it can be used like any other stream. However, the HttpResponse builder's streaming function isn't behaving nicely with it, and I'm not quite sure what I'm doing wrong here. Any advice to point me in the right direction would be really appreciated

use actix_web::{
    get,
    web::{self, Json},
    HttpResponse,
};
use futures::stream::{StreamExt, TryStreamExt};

//import dependency
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use mongodb::bson::{self, doc};
use mongodb::Client;
use mongodb::options::AggregateOptions;


#[get("/data")]

async fn getSensorData(client: web::Data<Client>) -> HttpResponse {
    let collection = client.database("rustcms").collection("data");

    let pipeline = vec![
       // pipeline goes here
    ];
    let options = AggregateOptions::builder().allow_disk_use(true).batch_size(500).build();
    let cursor = collection.aggregate(pipeline, options)
        .await


    match cursor {
        Ok(_) => HttpResponse::Ok().streaming(cursor.unwrap()),
        Err(err) => {
            println!("Error while getting, {:?}", err);
            HttpResponse::InternalServerError().finish()
        }
    }
}

What error are you getting when you attempt to stream the contents of the cursor?

type mismatch resolving `<Cursor<Document> as Stream>::Item == Result<Bytes, _>`
expected enum `Result<actix_web::web::Bytes, _>`
   found enum `Result<mongodb::bson::Document, mongodb::error::Error>

Which makes me think that I'm conflating two different uses of the word "stream." Or at least that one is expecting only a certain kind of stream (namely a byte stream). I did some googling on what I could do to cast the cursor into a byte stream (or even just my data in it) but I couldn't find anything that looked terribly helpful

From Actix-web's documentation:

Streaming response body

Response body can be generated asynchronously. In this case, body must implement the stream trait Stream<Item=Result<Bytes, Error>>.

This means that the Stream implementation bound is not satisfied because the Item associated types are different. What you need is to map both the Ok and the Err variants of the Result so that it conforms to what Actix-web expects.

1 Like

No, Stream is not the problem, the elements of the stream are. Actix expects a stream that contains Result<actix_web::web::Bytes, E> as elements, but you have a stream of Result<mongodb::bson::Document, mongodb::error::Error> elements. That's a type mismatch. You need to somehow convert the Document to Bytes. I think the easiest way would be to convert the Document to a RawDocumentBuf and wrap that in Bytes. You can use StreamExt::map to convert the elements, i.e.:

use futures::stream::StreamExt;

use mongodb::bson::raw::RawDocumentBuf;

use actix_web::web::Bytes;

let cursor = collection
    .aggregate(pipeline, options)
    .await
    .map(|r| r.map(|d| Bytes::from(RawDocumentBuf::from_document(&d).into_bytes())));

(untested).

1 Like

I think that got me a lot closer, but I'm still struggling with juggling some of the typing here.

Plugging in the code above to the end of my cursor gave me error

mismatched types
expected reference `&mongodb::bson::Document`
   found reference `&Result<mongodb::bson::Document, mongodb::error::Error>`

Which I took to mean that we simply simply needed to unwrap the &d so that we could access the document itself (if this is wrong, sorry, I'm still really new to rust). I had one other error like that and ended up with the following.

let stream = collection.aggregate(pipeline, options)
        .await
        .map(|r| r.map(|d| Bytes::from(RawDocumentBuf::from_document(&d.unwrap()).unwrap().into_bytes())));

which fixed the some of the typing issues but not my ultimate problem. The resulting body has a type of

let stream: Result<Map<Cursor<Document>, impl Fn(Result<Document, Error>) -> Bytes>, Error>

Which still isn't right. I'm getting the error

the trait bound `Result<futures::stream::Map<mongodb::Cursor<mongodb::bson::Document>, [closure@src/api/user.rs:31:24: 31:27]>, mongodb::error::Error>: futures::Stream` is not satisfied
the following other types implement trait `futures::Stream`:
  local_channel::mpsc::Receiver<T>
  tokio_util::sync::poll_semaphore::PollSemaphore
  actix_codec::framed::Framed<T, U>
  Box<S>
  futures::futures_channel::mpsc::Receiver<T>
  UnboundedReceiver<T>
  trust_dns_resolver::name_server::connection_provider::ConnectionResponse
  tokio_util::either::Either<L, R>
and 100 others

Which makes it seem that somewhere along the way it stopped being a stream

That code is not correct. You are mapping the result of the aggregation, you need to map the cursor:

let stream = collection.aggregate(pipeline, options)
        .await
        .unwrap()
        .map(|r| r.map(|d| Bytes::from(RawDocumentBuf::from_document(&d.unwrap()).unwrap().into_bytes())));

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.