You need to lock a mutex before you can access its contents. Sharing a mpsc::Sender rather than cloning it is redundant though. Instead of storing the sender in your AppState, you can just store it separately as an Extension so that every request handler gets its own instance of Sender, all sending to the same Receiver you use in that separate task (not a system thread but a green thread that is executed on the same thread pool used by all the other tasks that are spawned by axum to handle incoming requests).
AppState and Extension serve different purposes. The former is meant for sharing app-wide data between handlers, the latter is for extracting request-local data[1]. An mpsc (multi-producer, single-consumer) channel is already a synchronisation primitive that uses message passing. A mutex on the other hand is a synchronisation primitive for shared memory. Combining both results in unnecessary and potentially quite costly synchronisation overhead. You can cheaply clone the Sender (producer) of a channel for each request, allowing you to send messages on the channel to the consuming task, without the requests having to contend for the Sender first through the mutex.
As to the changes of your code, I was thinking roughly:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// let pool = PgPoolOptions::new() <- set up connection pool ...
let (tx, mut rx) = mpsc::channel::<String>(100);
let state = AppState {
d: Arc::new(Mutex::new(vec![])),
pool: pool,
};
// axum = "0.7.9"
let app = Router::new()
.route("/start", post(create_user))
.layer(Extension(tx))
.with_state(state);
// create thread
tokio::spawn(async move {
sleep(Duration::from_millis(3000)).await;
println!("on_thread");
let received = rx.recv().await.unwrap();
println!("{:?}", &received);
});
// hyper
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
async fn create_user(
// State(state): State<AppState>,
State(state): State<Arc<AppState>>,
Extension(tx): Extension<mpsc::Sender<String>>,
Json(json): Json<Value>,
) -> Json<Value> {
tx.send("to_thread".to_string()).await.unwrap();
}
#[derive(Debug, Clone)]
struct AppState {
d: Arc<Mutex<Vec<String>>>,
pool: PgPool,
}
Although you can also share app-wide data through an Extension<Arc<...>>. âŠī¸