Hello, I am trying to use Tokio Semaphore to limit number of user accessing my stream API but for some reason whenever I make the API call, the semaphore counter will only -1. Is there something that I am misunderstood here? Here's my code
use axum::extract::State;
use axum::response::IntoResponse;
use axum::routing::*;
use axum::Router;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Semaphore;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use axum_streams::*;
#[derive(Debug, Clone, Deserialize, Serialize)]
struct MyTestStructure {
some_test_field: String,
}
fn generate() -> Vec<MyTestStructure> {
(0..1000)
.map(|x| MyTestStructure {
some_test_field: format!("test {x}"),
})
.collect()
}
fn source_stream() -> impl Stream<Item = MyTestStructure> {
// Simulating a stream with a plain vector and throttling to show how it works
stream::iter(generate()).throttle(std::time::Duration::from_millis(50))
}
async fn json_nl_stream(State(sem): State<Arc<Semaphore>>) -> impl IntoResponse {
let permit = sem.clone().acquire_owned().await.unwrap();
log::info!("Acquired permit! Permit left: {}", sem.available_permits());
StreamBodyAs::json_nl(source_stream())
}
#[tokio::main]
async fn main() {
env_logger::init();
let sem = Arc::new(Semaphore::new(2));
let app = Router::new()
.route("/json-nl-stream", get(json_nl_stream))
.with_state(sem.clone());
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
log::info!("Starting server on {addr}");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
Output when I ran curl localhost:8080/json-nl-stream
twice
Acquired permit! Permit left: 1
Acquired permit! Permit left: 1
I understand that tokio can handle many task, but I am looking to learn some of the fundamental tokio sync structures to get a better understanding.
Any help would be appreciated!