Redis PubSub struct and mutable reference to own field

Hi, I'm learning rust and I'm stuck on a problem.

I'm trying to write a redis "consumer" which would eventually subscribe to a channel and listen to the messages using PubSub. So far I got this (because under the hood PubSub holds &'a mut Connection)

use redis::{Client, Connection, PubSub, RedisError};
use std::marker::PhantomData;

pub struct Consumer<'a, T> {
    con: Connection,
    pubsub: PubSub<'a>,
    phantom: PhantomData<T>,
}

impl<'a, T> Consumer<'a, T> {
    fn new(redis_url: &str) -> Result<Self, RedisError> {
        let _client = Client::open(redis_url)?;
        let mut con = _client.get_connection()?;
        let mut pubsub = con.as_pubsub();

        let phantom = PhantomData;

        Ok(Self {
            con,
            pubsub,
            phantom,
        })
    }
}

However this doesn't compile because

error[E0597]: `con` does not live long enough

and

error[E0505]: cannot move out of `con` because it is borrowed

The issue seems to be that my struct has a field holding mutable reference to the other field.

Is there any common workaround for this type of issue?

Storing references in a struct is a common beginner pitfall. Why do you want to structure your data like that?

At the end I would like to have a Consumer struct so that I could write code like this

let consumer = Consumer::new("redis://127.0.0.1");
consumer.subscribe("channel1");
for msg in consumer {
  println!("{:?}", msg);
};

where consumer would rely on Pubsub.get_message. Therefore I need to initiate PubSub somehow. Unfortunately, if I understand it correctly, the library doesn't provide "into" method to create "owned" sync PubSub. I think async PubSub version is ok. I would be thankful for any suggestion how to fix it.

I'm not familiar with Redis, but it looks to me like based on the way the type is set up that a connection can't (or shouldn't) be used for other things while a pubsub subscription is active. You could just not store the connection, and instead pass a connection into your Consumer::new method

1 Like

Thanks for the suggestion! This works

pub struct Consumer<'a, T> {
    // con: Connection,
    pubsub: PubSub<'a>,
    phantom: PhantomData<T>,
}

impl<'a, T> Consumer<'a, T> {
    fn new(redis_url: &str, con: &'a mut Connection) -> Result<Self, RedisError> {
        // let _client = Client::open(redis_url)?;
        // let mut con = _client.get_connection()?;
        let mut pubsub = con.as_pubsub();

        let phantom = PhantomData;

        Ok(Self {
            // con,
            pubsub,
            phantom,
        })
    }
}

but I would like have a wrapper such that user doesn't even have to create Connection. Should I create some additional struct or are there any smart pointers which could help me?

That can never work. You can never (even indirectly) create a (useful) self-referential type in safe Rust.* Self-referential types lead to dangling pointers. After all, if such a value were moved, then the self-reference would be invalidated (as it would still point to the previous location of the value).

That could only be the case if the library didn't require the current borrowing scheme. Smart pointers can't keep borrowed data alive through a borrow. Nothing can keep borrowed data alive through a borrow. Borrows don't keep stuff alive – by-value, owning bindings do.

If the library were designed in such a way that one type would accept a reference-counted smart pointer (Rc or Arc) to the other, then using such a smart pointer would help. In this case, it does not.


*: there are some tricks to instantiate self-referential types in safe Rust. They are useless though; eg. one common scheme results in a value that mutably borrows itself forever, so it can't be moved or destroyed.

I would personally not consider it worth it in this scenario, but one option is to create a macro that creates the connection and consumer.

use redis::{Client, Connection, PubSub, RedisError, RedisResult, ToRedisArgs};
use std::marker::PhantomData;

pub struct Consumer<'a, T> {
    pubsub: PubSub<'a>,
    phantom: PhantomData<T>,
}

impl<'a, T> Consumer<'a, T> {
    fn new(con: &'a mut Connection) -> Result<Self, RedisError> {
        let pubsub = con.as_pubsub();

        let phantom = PhantomData;

        Ok(Self { pubsub, phantom })
    }

    pub fn subscribe<A: ToRedisArgs>(&mut self, channel: A) -> RedisResult<()> {
        self.pubsub.subscribe(channel)
    }

    /// Simple way to get the for loop below to compile.
    pub fn messages(&mut self) -> Vec<()> {
        Vec::new()
    }
}

macro_rules! consumer {
    ($output:ident: $type:ty = $url:expr) => {
        let client = Client::open($url)?;
        let mut connection = client.get_connection()?;
        let mut $output: $type = Consumer::new(&mut connection)?;
    };
}

fn main() -> RedisResult<()> {
    consumer!(consumer: Consumer<()> = "redis://127.0.0.1");

    consumer.subscribe("channel1")?;
    for msg in consumer.messages() {
        println!("{:?}", msg);
    }

    Ok(())
}

Thank you for the macro_rules tip!

I'm considering also this approach a helper Subscription having only Iterator trait, although it is a bit less elegant.

use redis::{Client, Connection, PubSub, RedisError};
use std::marker::PhantomData;

pub struct Subscription<'a, T> {
    pubsub: PubSub<'a>,
    phantom: PhantomData<T>,
}

impl<'a, T> Subscription<'a, T> {
    fn new(con: &'a mut Connection, channel: &str) -> Result<Self, RedisError> {
        let mut pubsub = con.as_pubsub();
        let phantom = PhantomData;
        pubsub.subscribe(channel)?;

        Ok(Self { pubsub, phantom })
    }
}

impl<'a, T> Iterator for Subscription<'a, T> {
    type Item = T;
    fn next(&mut self) -> Option<Self::Item> {
        unimplemented!()
    }
}

pub struct Consumer<T> {
    con: Connection,
    phantom: PhantomData<T>,
}

impl<T> Consumer<T> {
    pub fn new(redis_url: &str) -> Result<Self, RedisError> {
        let _client = Client::open(redis_url)?;
        let con = _client.get_connection()?;
        let phantom = PhantomData;

        Ok(Self { con, phantom })
    }
    pub fn subscription(&mut self, channel: &str) -> Result<Subscription<T>, RedisError> {
        Subscription::new(&mut self.con, channel)
    }
}

so that I can

let mut consumer: Consumer<i32> = Consumer::new("localhost").unwrap();
for x in consumer.subscription("ok").unwrap() {
    println!("{}", x)
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.