Currently learning Rust and getting my hands dirty with Tokio as I was following along with this article on Tokio actors. I found myself in a scenario where an actor has its own state and methods to update its own state, but soon found out that going down this approach doesn't allow for concurrency on the actor's end, and since an actor is not thread-safe (holds a Receiver), I ended up shifting its state into the run_my_actor
function and something about that doesn't sit right with me.
I'm looking for pointers on where I messed up/a better approach for this. I'm still fairly new to Rust so I appreciate any guidance.
Below is an example code snippet, omitted some functions and modified for brevity.
struct MyActor {
name: String,
receiver: mpsc::Receiver<ActorMessage>,
processed_ids: Arc<RwLock<HashSet<u64>>>,
}
enum ActorMessage {
Ping { respond_to: oneshot::Sender<String> },
ProcessMessage { id: u64, body: String },
}
impl MyActor {
fn handle_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::ProcessMessage { id, body } => {
self.processed_ids.write().unwrap().insert(id);
}
ActorMessage::Ping { respond_to } => {
let _ = respond_to.send(format!("Pong from {}", self.name));
}
}
}
async fn purge_old_messages(&mut self) {
// Sleep to simulate a long-running task
tokio::time::sleep(Duration::from_secs(30)).await;
self.processed_ids.write().unwrap().clear();
}
}
async fn run_my_actor(mut actor: MyActor) {
let mut ticker = tokio::time::interval(Duration::from_secs(60));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: Allow processing of `Ping` while `purge_old_messages` is running
loop {
select! {
_ = ticker.tick() => {
actor.purge_old_messages().await;
}
Some(msg) = actor.receiver.recv() => {
// Cannot tokio spawn here as actor is not thread safe
actor.handle_message(msg);
}
}
}
}
// Omitted Handle code