Reduce `Rc<RefCell<T>>` usage

Hi, I have the following code to simulate a data server behavior:

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

#[derive(Debug)]
struct OneServer(u32);

#[derive(Default, Debug)]
struct Servers {
    servers: Vec<Rc<RefCell<OneServer>>>,
}

impl Servers {
    fn push_data(&self, _data: Vec<u8>) {
        for s in self.servers.iter() {
            let mut s = s.borrow_mut();
            s.0 += 1;
        }
    }

    fn add_server(&mut self, server: Rc<RefCell<OneServer>>) {
        self.servers.push(server)
    }
}

struct Dispatcher {
    servers: HashMap<String, Rc<RefCell<Servers>>>,
}

impl Dispatcher {
    fn new() -> Self {
        Self {
            servers: HashMap::default(),
        }
    }

    fn register(&self, inst_type: &str, server: Rc<RefCell<OneServer>>) {
        println!("register: debug servers: {:?}", self.servers);
        let mut servers = self.servers.get(inst_type).unwrap().borrow_mut();
        servers.add_server(server)
    }

    fn push_data(&self, inst_type: &str, data: Vec<u8>) {
        println!("push_tick: debug servers: {:?}", self.servers);
        let servers = self.servers.get(inst_type).unwrap().borrow_mut();
        servers.push_data(data)
    }
}

fn main() {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build_local(&mut Default::default())
        .unwrap();

    runtime.block_on(async {
        let mut dispatcher = Dispatcher::new();
        dispatcher.servers.insert(
            "stock".to_string(),
            Rc::new(RefCell::new(Servers::default())),
        );

        let dispatcher = Rc::new(dispatcher);
        let dispatcher_clone = dispatcher.clone();

        // worker 1.
        runtime.spawn_local(async move {
            loop {
                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
                dispatcher_clone.push_data("stock", vec![]);
                println!("call push tick complete.")
            }
        });

        // worker 2.
        loop {
            let dispatcher_clone = dispatcher.clone();
            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
            runtime.spawn_local(async move {
                let server = Rc::new(RefCell::new(OneServer(0)));
                dispatcher_clone.register("stock", server);
                println!("call register complete.");
            });
        }
    });
}

The core idea is I have a HashMap<String, Rc<RefCell<Servers>>>, which will call register to add a server to Servers in worker 2. In worker 1, data will be push to servers periodically.

I chose tokio's LocalRuntime is because I don't want to use lock, the data will be updated really fast, locking will likely to cause performance issues. But I have to use Rc<RefCell<servers>> and Rc<RefCell<OneServer>> to gain internal mutability, not really sure is there a better way to achieve this? Nested Rc<RefCell<T>> seems not good to me.

Here is the repo for the code: GitHub - WindSoilder/tmp_aaa

1 Like

Spoiler alert: RefCell is the single-threaded version of Mutex.

The usual way to solve this kind of problem is the "don't communicate by sharing, share by communicating" mantra. The venerable actor pattern: Actors with Tokio – Alice Ryhl.

The sample code gets a bit bigger with a new third worker task (the actor) and a message type to send over the actor's channel. But in return, you get rid of all of the reference counting and runtime borrow checking.

use std::collections::HashMap;

#[derive(Debug)]
struct OneServer(u32);

#[derive(Default, Debug)]
struct Servers {
    servers: Vec<OneServer>,
}

impl Servers {
    fn push_data(&mut self, _data: Vec<u8>) {
        for s in self.servers.iter_mut() {
            s.0 += 1;
        }
    }

    fn add_server(&mut self, server: OneServer) {
        self.servers.push(server)
    }
}

struct Dispatcher {
    servers: HashMap<String, Servers>,
}

impl Dispatcher {
    fn new() -> Self {
        Self {
            servers: HashMap::default(),
        }
    }

    fn register(&mut self, inst_type: &str, server: OneServer) {
        println!("register: debug servers: {:?}", self.servers);
        let servers = self.servers.get_mut(inst_type).unwrap();
        servers.add_server(server)
    }

    fn push_data(&mut self, inst_type: &str, data: Vec<u8>) {
        println!("push_tick: debug servers: {:?}", self.servers);
        let servers = self.servers.get_mut(inst_type).unwrap();
        servers.push_data(data)
    }
}

enum Msg {
    Register {
        inst_type: String,
        server: OneServer,
    },

    PushData {
        inst_type: String,
        data: Vec<u8>,
    },
}

fn main() {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build_local(&Default::default())
        .unwrap();

    runtime.block_on(async {
        let mut dispatcher = Dispatcher::new();
        dispatcher
            .servers
            .insert("stock".to_string(), Servers::default());

        // dispatcher actor.
        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
        runtime.spawn_local(async move {
            while let Some(msg) = rx.recv().await {
                match msg {
                    Msg::Register { inst_type, server } => {
                        dispatcher.register(&inst_type, server);
                    }

                    Msg::PushData { inst_type, data } => {
                        dispatcher.push_data(&inst_type, data);
                    }
                }
            }
        });

        // worker 1.
        runtime.spawn_local({
            let tx = tx.clone();
            async move {
                loop {
                    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
                    tx.send(Msg::PushData {
                        inst_type: "stock".to_string(),
                        data: vec![],
                    })
                    .await
                    .unwrap();
                    println!("call push tick complete.")
                }
            }
        });

        // worker 2.
        loop {
            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
            let tx = tx.clone();
            runtime.spawn_local(async move {
                tx.send(Msg::Register {
                    inst_type: "stock".to_string(),
                    server: OneServer(0),
                })
                .await
                .unwrap();
                println!("call register complete.");
            });
        }
    });
}
4 Likes

Wow! Thanks!
The new implementation is really cool! In this way, I can also use default multi-thread Runtime.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.