Build an actix-web endpoint to see live changes of my redis cache

I'm want to implement a REST endpoint with actix-web in order to see live changes of a key-value pair in my Redis cache. I can not find a way to implement that issue. How can I implement this?

It should therefore be an endpoint that shows in real time whether there is new data in the key-value pair.

You can create streaming http responses in actix-web with HttpResponseBuilder::streaming. On the server-side you need to listen to keyspace notifications. If you are using the redis crate, you can create an async Pub/Sub channel, subscribe to the keyspace notifications you need and turn the Pub/Sub channel into a stream.

1 Like

Ok, thanks. I will try it.

Where can I subscribe to a specific channel? Because I only want to see data from one key.

You should be able to psubscribe your Pub/Sub connection to the keyspace notifications for your key. Something like __keyspace@your-db__:your-key * should stream all change events of your-key, I think. Note that you need to enable keyspace notifications in your redis instance.

1 Like

So I call the get_async_pubsub function where I call the GET because I pass the key there as well?
Something like:

//init is the connection function
init().get(key).get_async_pubsub().await;

And in my REST endpoint something like:

HttpResponse::Ok().stream(init().subscribe(key).into_on_message())

Oh I should read the documentation...I use the keyspace on specific keys.

Do you now how to parse PubSub channel in order to pass it to HttpResponseBuilder::streaming because this seems not trivial and I can not find a solution like using map(). I thought that just passing the output of the PubSub Channel is enough but it didn't work. I'm not very familiar with streams in Rust.

You can think of Stream as the async version of Iterator. If you look at the definition of the into_on_message method that you linked, you'll see that it returns impl Stream<Item = Msg>, a.k.a. a stream where each item is of type Msg.

Now, if you go to the documentation for Msg, you'll find the get_payload_bytes method. Usually when you are converting models from one dependency (Msg from the redis crate, in this case) to interact with another dependency (actix-web in your case), you'll have to use either a type that is common between the two interfaces; that usually boils down to using byte slices ([u8]) when the two dependencies have nothing in common at all.

Now, going to the actix web side of things, you'll see that the streaming method has a stream generic parameter that must implement the Stream trait where Item = Result<Bytes, E>> + 'static.

So, to complete the conversion we need to build Bytes from the byte slice that we got from the Redis message. Go to its documentation and you'll find several ways to build one from a bytes slice. Notice that you can't simply take the reference to the byte slice that get_payload_bytes returned, because Bytes needs data that's owned, that's what the 'static lifetime is telling you.

2 Likes

Great, thanks!

I tried it with to_owned() and that does not work.

let mut subscribe: PubSub = RedisCache::create_client().get_async_pubsub().await.unwrap();
subscribe.psubscribe("").await.unwrap();
let pubsub_stream = subscribe.into_on_message();
let byte_msg = pubsub_stream.next().await.unwrap().get_payload_bytes();
let owned_data = byte_msg.to_owned().as_slice();
// this does not work either
// let owned_data = web::Bytes::from(raw_msg.to_owned().as_slice());
the trait bound `&[u8]: futures::Stream` is not satisfied
the following other types implement trait `futures::Stream`:
[...]

Your error message looks like you are trying to stream a single message and not the whole message stream. You can map the elements from your redis stream to Bytes with StreamExt::map and pass that mapped stream to actix-web.

I already tried that

let mut pubsub_stream = pubsub_conn.into_on_message();
let mapped_stream = pubsub_stream.map(|msg| {
        let bytes = msg.get_payload_bytes();
        bytes::Bytes::from(bytes)
});

And this returns

expected `{closure@dashboard_api.rs:54:43}` to be a closure that returns `Result<Bytes, _>`, but it returns `Bytes`
expected enum `Result<bytes::Bytes, _>`
 found struct `bytes::Bytes`
required for `{closure@src/api/dashboard_api.rs:54:43: 54:48}` to implement `futures_util::fns::FnOnce1<Msg>`

What happens if you return Ok(Bytes::from(bytes))?

1 Like

consider specifying the generic arguments: ::<bytes::Bytes, E>

But the compiler does not know what E is.

It shouldn't care about the error variant. Did you put the question mark at the end, perhaps?

No

E can be any type that implements Into<Box<dyn Error>> + 'static.

1 Like

That worked but now I have a problem with borrowing.

56 |     let mapped_stream = pubsub_stream.map(|msg| {
   |                                            --- binding `msg` declared here
57 |         let bytes = msg.get_payload_bytes();
   |                     ^^^--------------------
   |                     |
   |                     borrowed value does not live long enough
   |                     argument requires that `msg` is borrowed for `'static`
58 |         Ok::<bytes::Bytes, RedisError>(bytes::Bytes::from(bytes))
59 |     });
   |     - `msg` dropped here while still borrowed

Could you try constructing the bytes instance you return with Bytes::copy_from_slice?

(sorry for this laborious process, I'm traveling right now and only have my mobile with me)

3 Likes

No errors anymore. Thank you for your patience!