Axum state, websockets, and Sync error

Hi,

There seems to be a conflict with my websocket handler and the application state type. Here is hopefully enough context:

#[debug_handler]
async fn master_ws_handler(
    ws: WebSocketUpgrade,
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
    State(locked_state): State<SharedState>
) -> impl IntoResponse {
    let game = locked_state.clone();
    ws.on_upgrade(move |socket| async move { game.handle_master_socket(socket, addr) } )
}

#[derive(Default)]
pub struct AppState {
    players: Vec<Player>,
    color_used: Vec<bool>,
    game_state: GameState,
    master_socket: Option<WebSocket>
}

pub type SharedState = Arc<RwLock<AppState>>;

// in main...
    let shared_state = SharedState::default();
// in the router:
[...]
        .with_state(Arc::clone(&shared_state))

Here is the error I'm getting:

error[E0277]: `(dyn hyper::upgrade::Io + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/main.rs:140:9
    |
140 |     ws: WebSocketUpgrade,
    |         ^^^^^^^^^^^^^^^^ `(dyn hyper::upgrade::Io + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `(dyn hyper::upgrade::Io + std::marker::Send + 'static)`
    = help: the trait `FromRequestParts<S>` is implemented for `WebSocketUpgrade`
    = note: required for `Unique<(dyn hyper::upgrade::Io + std::marker::Send + 'static)>` to implement `Sync`
    = note: required because it appears within the type `Box<dyn Io + Send>`
    = note: required because it appears within the type `Rewind<Box<dyn Io + Send>>`
    = note: required because it appears within the type `Upgraded`
    = note: required because it appears within the type `AllowStd<Upgraded>`
    = note: required because it appears within the type `WebSocket<AllowStd<Upgraded>>`
    = note: required because it appears within the type `WebSocketStream<Upgraded>`
    = note: required because it appears within the type `WebSocket`
    = note: required because it appears within the type `Option<WebSocket>`
note: required because it appears within the type `AppState`
   --> src/state.rs:24:12
    |
24  | pub struct AppState {
    |            ^^^^^^^^
    = note: required for `std::sync::RwLock<AppState>` to implement `Sync`
    = note: required for `Arc<std::sync::RwLock<AppState>>` to implement `std::marker::Send`
    = note: required for `WebSocketUpgrade` to implement `FromRequestParts<Arc<std::sync::RwLock<AppState>>>`
    = help: see issue #48214

I'm not sure why an Option<Websocket> within the Player type would prevent the Websocket from being used in a handler function.

Anything obvious I'm doing wrong? Thanks for any input! Been stymieing me for a couple weeks now.

The problem arises because Axum's WebSocket is not Sync. Therefore, it isn't safe to put it in a SharedState and then pass a ref to this SharedState to your Axum router.

Now, you may be thinking that you've protected this SharedState with a RwLock so there shouldn't be a problem, but RwLock does not turn a !Sync type into a Sync one! The outer type will simply inherit the !Sync trait from the type it wraps.

The simplest solution is to replace RwLock with a Mutex. Because a mutex guarantees exclusive access to an underlying datatype, wrapped !Sync types become Sync. You could also keep the RwLock and add an arc-mutex to master_socket in AppState, i.e. add a second lock protecting a subset of your data. Depending on your performance requirements, either option could be preferable.

On another note, it seems that you are using RwLock from the standard library and not tokio's async version. This is fine if you're not holding a write lock across .await points, but if you are, you should use the tokio version instead. See the docs for more details.

1 Like

Thanks much, yep a switch to Mutex fixed it. This app will have low concurrency so this should work. Great explanation, thanks again!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.