Converting non thread safe iterators to stream

I'm looking to optimize GitCommitLoader which uses gitoxide crate to loop through all commits and return a stream of Document. A document consists of git commit hash, author name and message.

#[async_trait]
impl Loader for GitCommitLoader {
    async fn load(
        mut self,
    ) -> Result<
        Pin<Box<dyn Stream<Item = Result<Document, LoaderError>> + Send + 'static>>,
        LoaderError,
    > {
        let repo = self.repo.to_thread_local();

        let commits_iter = repo
            .rev_walk(Some(repo.head_id().unwrap().detach()))
            .all()
            .unwrap()
            .filter_map(Result::ok)
            .map(|oid| {
                let commit = oid.object().unwrap();
                let commit_id = commit.id;
                let author = commit.author().unwrap();
                let email = author.email.to_string();
                let name = author.name.to_string();
                let message = format!("{}", commit.message().unwrap().title);

                let mut document = Document::new(format!(
                    "commit {commit_id}\nAuthor: {name} <{email}>\n\n    {message}"
                ));
                let mut metadata = HashMap::new();
                metadata.insert("commit".to_string(), Value::from(commit_id.to_string()));

                document.metadata = metadata;
                Ok(document)
            });

        // TODO: This is a temporary solution to collect all the docs as can't share it between threads
        let documents = commits_iter.collect::<Vec<_>>();

        Ok(Box::pin(stream::iter(documents)))
    }
}

While this works and returns a stream, it collects the all the git commits which won't scale for large git repositories. Given that the commits_iter is not thread safe, what is the best way to return a stream of documents in an efficient way.

The general solution to this problem is:

  • Run the iterator on a single thread...
  • ...and send its items on a channel to become the async stream that can be used from anywhere.

My current favorite channel library is flume, which supports both blocking and async operations on both ends, including implementing Stream, so using it would look something like this:

async fn load(
    mut self,
) -> Result<
    impl Stream<Item = Result<Document, LoaderError>> + Unpin + Send + 'static,
    LoaderError,
> {
    let (tx, rx) = flume::bounded(1);
    std::thread::spawn(move || {
        let commits_iter = todo!();
        for document in commits_iter {
            if tx.send(document).is_err() {
                break; // stream must have been dropped early
            }
        }
    });

    rx.into_stream()
}
4 Likes

Using channel works.