I build my sample project to understand async and multithreading better. The original application is a Twitch chats sniffer which reads messages, parses them and saves them if meet conditions. I'm struggling to understand the idiomatic way of working with Tokio and with async functions in general.
I have shortened the project to the bare minimum to show you the way I'm dealing with multithreading.
extern crate tokio;
extern crate crossbeam;
extern crate rayon;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
// Which types of tasks could be sent for a CPU-bound work
#[derive(Debug)]
enum ProcessorWork {
Work(String),
// etc...
}
// Which events may occur for the event loop
#[derive(Debug)]
enum AppEvent {
Message(String),
ExternalAction {
action: Action,
// Responder sends an action result back to the sender
responder: tokio::sync::oneshot::Sender<ActionResult>,
},
// etc...
}
// Which external actions may occur
#[derive(Debug)]
enum Action {
Ws,
AddToShared(String),
Kill,
// etc...
}
#[derive(Debug)]
enum ActionResult {
Ok,
Err,
// etc...
}
// Spawn action generator.
// By action generator, I mean any listener for user input.
// A real application is using the UNIX socket to communicate with a dedicated CLI.
fn spawn_action_gen(emitter: crossbeam::channel::Sender<AppEvent>) {
let _ = tokio::spawn(async move {
loop {
// Awaiting for an external action...
tokio::time::sleep(Duration::from_millis(4_000)).await;
// Some async reading...
tokio::time::sleep(Duration::from_micros(5)).await;
// Send app event...
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = emitter.send(AppEvent::ExternalAction {
action: Action::Ws,
responder: tx,
});
// Return result of external action...
let _ = rx.await.unwrap();
tokio::time::sleep(Duration::from_micros(5)).await;
}
});
}
// Spawn WebSocket with relatively high traffic.
// But to be honest I still don't understand what high traffic is. Let's say 1 message per 1 ms.
// In a real project, it's spawning a WebSocket connection to Twitch with many chats (above 30 with high online).
fn spawn_ws(
emitter: crossbeam::channel::Sender<AppEvent>,
) -> tokio::sync::mpsc::Sender<Action> {
let (cmd_sender, mut cmd_receiver) = tokio::sync::mpsc::channel(16);
let _ = tokio::spawn(async move {
// Emulation of heavy traffic via WebSocket
let mut interval = tokio::time::interval(Duration::from_millis(1));
loop {
tokio::select! {
_ = interval.tick() => {
// Send received message to the event loop...
let _ = emitter.send(AppEvent::Message("message".to_owned()));
},
Some(action) = cmd_receiver.recv() => {
// Handle external action...
tokio::time::sleep(Duration::from_millis(3)).await;
}
}
}
});
cmd_sender
}
// Spawn a work processor.
// It spawns a dedicated thread with a Rayon pool. The main aim is to process CPU-intensive tasks.
// In a real application here mostly parsed and analyzed user's messages from Twitch chats gathered via WebSocket and passed by the event loop.
fn spawn_processor(
shared_vec: Arc<RwLock<Vec<String>>>,
) -> crossbeam::channel::Sender<ProcessorWork> {
let (work_sender, work_receiver) = crossbeam::channel::bounded(64);
let _ = std::thread::spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
for work in work_receiver {
let lock = shared_vec.read().unwrap();
if !lock.is_empty() {
pool.spawn(move || {
match work {
ProcessorWork::Work(_m) => {
// do some work, and save to db...
std::thread::sleep(Duration::from_millis(100));
}
}
})
}
}
});
work_sender
}
// I want to use only one thread for Tokio runtime.
// Because I think it's enough to just read messages from chats.
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Some shared data.
// In a real application here are stored editable "match patterns" for messages.
// The real type is Arc<RwLock<Vec<Arc<RwLock<FnvHashMap<String, MatchPattern>>>>>>
let shared_vec = Arc::new(RwLock::new(vec![]));
// Channels for an event loop.
let (event_sender, event_receiver) = crossbeam::channel::bounded(64);
spawn_action_gen(event_sender.clone());
let ws_cmd_sender = spawn_ws(event_sender.clone());
let work_sender = spawn_processor(shared_vec.clone());
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
// Here I wrap kill_tx into Arc<Mutex<Option<...>>> to share the "kill-switch" between threads.
let kill_tx = Arc::new(Mutex::new(Some(kill_tx)));
// Spawn event loop.
// The goal is quite straightforward.
let _ = std::thread::spawn(move || {
while let Ok(e) = event_receiver.recv() {
match e {
AppEvent::Message(m) => {
let _ = work_sender.send(ProcessorWork::Work(m));
}
AppEvent::ExternalAction { action, responder } => {
match action {
Action::Ws => {
let _ = ws_cmd_sender.blocking_send(action);
let _ = responder.send(ActionResult::Ok);
}
Action::AddToShared(s) => {
let mut lock = shared_vec.write().unwrap();
lock.push(s);
let _ = responder.send(ActionResult::Ok);
}
Action::Kill => {
let _ = kill_tx.clone().lock().unwrap().take().unwrap().send(());
}
}
}
}
}
});
let _ = kill_rx.await;
}
The most interesting points for me:
- Is it okay to pass messages from async code to sync via crossbeam synced channels?
- How much overhead (if it is at all) when I use blocking_send on Tokio channels, and should I avoid it?
- Does RwLock block the event loop in my case?
Besides, it's very interesting to know how would you do this and do you think that this way is fine?
I understand that's maybe quite a lot of code to read. But as always, I'm happy to have any feedback even the smallest one. Also, I hope this may help somebody else as well.