Tokio semaphore not working

Hello, I am trying to use Tokio Semaphore to limit number of user accessing my stream API but for some reason whenever I make the API call, the semaphore counter will only -1. Is there something that I am misunderstood here? Here's my code

use axum::extract::State;
use axum::response::IntoResponse;
use axum::routing::*;
use axum::Router;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Semaphore;

use futures::prelude::*;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

use axum_streams::*;

#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyTestStructure {
    some_test_field: String,
}

fn generate() -> Vec<MyTestStructure> {
    (0..1000)
        .map(|x| MyTestStructure {
            some_test_field: format!("test {x}"),
        })
        .collect()
}

fn source_stream() -> impl Stream<Item = MyTestStructure> {
    // Simulating a stream with a plain vector and throttling to show how it works
    stream::iter(generate()).throttle(std::time::Duration::from_millis(50))
}

async fn json_nl_stream(State(sem): State<Arc<Semaphore>>) -> impl IntoResponse {
    let permit = sem.clone().acquire_owned().await.unwrap();
    log::info!("Acquired permit! Permit left: {}", sem.available_permits());
    StreamBodyAs::json_nl(source_stream())
}

#[tokio::main]
async fn main() {
    env_logger::init();
    let sem = Arc::new(Semaphore::new(2));

    let app = Router::new()
        .route("/json-nl-stream", get(json_nl_stream))
        .with_state(sem.clone());

    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

    log::info!("Starting server on {addr}");

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

Output when I ran curl localhost:8080/json-nl-stream twice

Acquired permit! Permit left: 1
Acquired permit! Permit left: 1

I understand that tokio can handle many task, but I am looking to learn some of the fundamental tokio sync structures to get a better understanding.

Any help would be appreciated!

In json_nl_stream, permit is released after the function returns :


async fn json_nl_stream(State(sem): State<Arc<Semaphore>>) -> impl IntoResponse {
    let permit = sem.clone().acquire_owned().await.unwrap(); // acquire the permit
    log::info!("Acquired permit! Permit left: {}", sem.available_permits());
    StreamBodyAs::json_nl(source_stream())
} // permit is released after function return

You probably need the permit to stay around until the Stream is dropped. In your code, the permit only stays around until the stream is created.

You could create a struct that implements Stream and has the stream and permit as fields. Your Stream implementation would just call the inner stream.

2 Likes