How to stream via actix web/any other crate

How can I achive same result as code below? The code below return error I cant move state as it is Arc.

I already know workaround by writing futures::Stream implementation manually (topic)
but for that I'll have to recv from receiver and than back again return response in Stream impl.
I am not effiecnt/bigginer in async rust.

In code below UnboundedReceiver or Receiver of futures crate can be directly returned as httpresponse as these impl Stream already.
Any other workarounds/refactor or any other ideas/suggestions are appreciated

use std::{convert::Infallible, io, path::Path, sync::Mutex, time::Duration};

use actix_web::{
    get,
    web::{Bytes, Data},
    App, HttpResponse, HttpServer,
};
use futures::channel::mpsc::UnboundedReceiver;
use notify_debouncer_full::{
    new_debouncer,
    notify::{RecursiveMode, Watcher},
};

const FILE: &'static str = "path\\to\\file\\or\\folder";

#[get("/")]
async fn handler(state: Data<Mutex<UnboundedReceiver<Result<Bytes, Infallible>>>>) -> HttpResponse {
    //This line showing error and program doesnt compiles
    HttpResponse::Ok().streaming(state.into_inner().into_inner().unwrap())// This line return error
}

#[actix_web::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = futures::channel::mpsc::unbounded::<Result<_, Infallible>>();
    let mut watcher = new_debouncer(Duration::from_secs(1), None, move |_events_res| {
        tx.unbounded_send(Ok(Bytes::from_static(b"hello"))).unwrap();
    })
    .unwrap();

    watcher
        .watcher()
        .watch(Path::new(FILE), RecursiveMode::Recursive)
        .unwrap();

    watcher.cache().add_root(FILE, RecursiveMode::Recursive);

    let data = Data::new(Mutex::new(rx));

    HttpServer::new(move || App::new().app_data(data.clone()))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

forgive if question is not clear, pls ask for clarification.
what I am trying to do is a simple server reporting filesystem events. any ideas/suggestions regarding this are also welcome.

It looks like you want to watch the filesystem for any changes (e.g. files being added/removed), and then stream these events to a web client over a long-lived connection.

One potential problem with using a message channel like this is that each message only gets delivered once, so if two or more clients are connected at the same time, only one of them will receive the notification and you don't know who it will be. Also, the message is lost after sending it out, since you don't store it anywhere.

One possible solution is to store all events in a buffer somewhere using a background task, and then read from the buffer in your handler function. You could also push events to an external server like Redis and read from it when needed.

If I understand you correctly, you want to create an infinite loop in your handler that can continuously read events and then yield each one to Actix. You can look at the async_stream crate for an example of something like this:

This GitHub post gives an Actix-specific example:

thanks for your kind response, but I hv dropped the idea of using actix web streaming. I couldnt make that work. Sorry for trouble. I'll be trying to use websockets or any other way.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.