Axum handler dropping Sender, but why?

Namaskaram, Hello, Hallo, Hola, Bonjour etc. fellow Rustaceans,

I am a fairly younger rustacean, with objectives of using it as my primary language. So far, I've learnt a lot. Regarding the stiff learning curve, irrespective of whether it's worth it or not, I tried some toy programs and Rust's guarantees plus the performance is alluring, atleast for me. Much to see, much to learn, as is always the case :slight_smile:

Question begins below.

I have tested storing a TX (std::sync::mpsc, tokio::sync::{mpsc, broadcast}, plume::{unbounded, bounded}) in AppState.

What happens, is corresponding handler uses the TX and then drops it. But, when I am storing the TX in state behind a lock, how can the handler drop it?

Objectives: I want to forward requests I receive in handlers to the process spawned in main.

Kindly pour in your valuable suggestions and insights. Following is my attempt.

use axum::{extract::State, response::IntoResponse, routing::post, Json, Router};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;

#[derive(Deserialize, Serialize, Clone, Debug)]
struct Event {
    payload: String,
}

struct AppState {
    TX: Arc<broadcast::Sender<Event>>,
}

impl AppState {
    fn new() -> Self {
        let (TX, _) = broadcast::channel(21);

        let shared_state = Self {
            TX: Arc::new(TX.clone()),
        };

        shared_state
    }
}

#[tokio::main]
async fn main() {
    let app_state = Arc::new(AppState::new());
    let mut sub = app_state.clone().TX.subscribe();

    tokio::spawn(async move {
        println!("BEGIN---------");

        match sub.recv().await {
            Ok(event) => println!("{event:?}"),
            Err(e) => {
                eprintln!("Error | {}", e.to_string());
            }
        }

        println!("END---------");
    });

    let app = Router::new()
        .route("/events", post(root))
        .with_state(app_state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

#[axum::debug_handler]
async fn root(State(state): State<Arc<AppState>>, Json(payload): Json<Event>) -> impl IntoResponse {
    if let Err(e) = state.TX.send(payload.clone()) {
        eprintln!("Error while sending: {:?}", e.to_string());
    }

    Json(payload)
}

Thank you for your time, patience and grit.

Errata #1

I've removed Mutex, but with mutex also it was the same result, Sender getting dropped.

I don’t have experience with the libraries you’re using, but I don’t see anything obvious. What behavior are you seeing that makes you believe TX is getting dropped earlier than you expect?


There’s no loop here, so this will only ever process the first message passed through the channel and then end, even if more messages are sent later.

1 Like

My Gawd.

From "the" docs - SendError in std::sync::mpsc


std::sync::mpsc
Struct SendError
1.0.0

pub struct SendError<T>(pub T);

An error returned from the Sender::send or SyncSender::send function on channels.

A send operation can only fail if the receiving end of a channel is disconnected, implying that the data could never be received. The error contains the data being sent as a payload so it can be recovered.


Interpretation: So, obviously, the Sender was getting dropped, because no loop was containing the recv(). No one to receive, so we fail to send.

In retrospective, everything is obvious!

Thank you _/\_ for the quick response @2e71828.