I'm want to implement a REST endpoint with actix-web in order to see live changes of a key-value pair in my Redis cache. I can not find a way to implement that issue. How can I implement this?
It should therefore be an endpoint that shows in real time whether there is new data in the key-value pair.
You should be able to psubscribe your Pub/Sub connection to the keyspace notifications for your key. Something like __keyspace@your-db__:your-key * should stream all change events of your-key, I think. Note that you need to enable keyspace notifications in your redis instance.
Do you now how to parse PubSub channel in order to pass it to HttpResponseBuilder::streaming because this seems not trivial and I can not find a solution like using map(). I thought that just passing the output of the PubSub Channel is enough but it didn't work. I'm not very familiar with streams in Rust.
You can think of Stream as the async version of Iterator. If you look at the definition of the into_on_message method that you linked, you'll see that it returns impl Stream<Item = Msg>, a.k.a. a stream where each item is of type Msg.
Now, if you go to the documentation for Msg, you'll find the get_payload_bytes method. Usually when you are converting models from one dependency (Msg from the redis crate, in this case) to interact with another dependency (actix-web in your case), you'll have to use either a type that is common between the two interfaces; that usually boils down to using byte slices ([u8]) when the two dependencies have nothing in common at all.
Now, going to the actix web side of things, you'll see that the streaming method has a stream generic parameter that must implement the Stream trait where Item = Result<Bytes, E>> + 'static.
So, to complete the conversion we need to build Bytes from the byte slice that we got from the Redis message. Go to its documentation and you'll find several ways to build one from a bytes slice. Notice that you can't simply take the reference to the byte slice that get_payload_bytes returned, because Bytes needs data that's owned, that's what the 'static lifetime is telling you.
I tried it with to_owned() and that does not work.
let mut subscribe: PubSub = RedisCache::create_client().get_async_pubsub().await.unwrap();
subscribe.psubscribe("").await.unwrap();
let pubsub_stream = subscribe.into_on_message();
let byte_msg = pubsub_stream.next().await.unwrap().get_payload_bytes();
let owned_data = byte_msg.to_owned().as_slice();
// this does not work either
// let owned_data = web::Bytes::from(raw_msg.to_owned().as_slice());
the trait bound `&[u8]: futures::Stream` is not satisfied
the following other types implement trait `futures::Stream`:
[...]
Your error message looks like you are trying to stream a single message and not the whole message stream. You can map the elements from your redis stream to Bytes with StreamExt::map and pass that mapped stream to actix-web.
let mut pubsub_stream = pubsub_conn.into_on_message();
let mapped_stream = pubsub_stream.map(|msg| {
let bytes = msg.get_payload_bytes();
bytes::Bytes::from(bytes)
});
And this returns
expected `{closure@dashboard_api.rs:54:43}` to be a closure that returns `Result<Bytes, _>`, but it returns `Bytes`
expected enum `Result<bytes::Bytes, _>`
found struct `bytes::Bytes`
required for `{closure@src/api/dashboard_api.rs:54:43: 54:48}` to implement `futures_util::fns::FnOnce1<Msg>`
That worked but now I have a problem with borrowing.
56 | let mapped_stream = pubsub_stream.map(|msg| {
| --- binding `msg` declared here
57 | let bytes = msg.get_payload_bytes();
| ^^^--------------------
| |
| borrowed value does not live long enough
| argument requires that `msg` is borrowed for `'static`
58 | Ok::<bytes::Bytes, RedisError>(bytes::Bytes::from(bytes))
59 | });
| - `msg` dropped here while still borrowed