How to use Mongodb::cursor in mongodb Aggregate?

It's a very simple project to learn how to use mongodb with Rust. I'm using the official mongodb driver here: GitHub - mongodb/mongo-rust-driver: The official MongoDB Rust Driver. The problem is that if I'm using aggregate, I cannot read the result

// main.rs
use mongodb::bson::{doc, Bson};
use mongodb::{options::AggregateOptions, options::ClientOptions, Client};
use std::error::Error;
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

   // Load the MongoDB connection string:
   let client_uri = "mongodb://127.0.0.1:27017";

   // A Client is needed to connect to MongoDB:
   let mut options = ClientOptions::parse(&client_uri).await?;
   options.app_name = Some("testing".to_string());
   let client = Client::with_options(options)?;
   
   // get the collection here
   let items = client.database("my_database").collection("inventory");

   // aggregate options and pipeline
   let pipeline = vec![doc! {"$match": {"name": "FOO"}}];
   let options = AggregateOptions::builder().allow_disk_use(true).build();

   // I'm using tokio for async-await library
   let data = items
      .aggregate(pipeline, options)
      .await
      .map_err(|e| println!("{}", e));

    // data is a Result<mongodb::Cursor> type
    match data {
       Ok(cursor) => {
        
          // I want to iterate the returned documents here
          // this doesn't compiles
          while let Some(doc) = cursor.next().await {
              println!("{}", doc?)
          }
       },
       Err(e) => println!("{:?}", e),
}

The code above returns an error. It complains that the cursor has no next() function within.

 while let Some(doc) = cursor.next().await {
   |                          ^^^^ method not found in `mongodb::Cursor`

I read the manual book for mongodb::Cursor here: mongodb::Cursor - Rust

and aggregate function here mongodb::Collection - Rust

As you can see the aggregate method should return Result<Cursor>. As the manual stated:

A cursor can be used like any other Stream. The simplest way is just
to iterate over the documents it yields:

while let Some(doc) = cursor.next().await {
  println!("{}", doc?)
}

So why is it doesn't work ? Does anyone know how to work with mongodb aggregate in Rust?

My dependencies in Cargo.toml:

[dependencies]
tokio = { version = "0.2", features = ["macros", "rt-threaded"]  }
serde = { version = "1.0", features = ["derive"] }
mongodb = "1.2.0"

Nevermind.... I just found it! Just add use tokio::stream::StreamExt; on top of the file and the rest is good to go. Since mongodb::Cursor is just an implementation of futures::stream::Stream

...all the other methods that an Stream has are available on Cursor as
well. This includes all of the functionality provided by StreamExt,
which provides similar functionality to the standard library Iterator
trait.

// main.rs
use mongodb::bson::{doc, Bson};
use mongodb::{options::AggregateOptions, options::ClientOptions, Client};
use std::error::Error;
use tokio;

// don't forget this!
use tokio::stream::StreamExt;

Note that there is an alpha release of the mongodb driver that supports Tokio 1.x. You can find its documentation here, and the version number is 2.0.0-alpha.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.