Hello!
I am implementing last-in first-out scheduler/queue/server, that would accept incoming request, put into queue for processing and return result when ready. Queue depth is limited and I want most recent requests to be processed first.
This is naive implementation with axum, queue vector held in arc::mutex
and oneshot channels for giving responses back. Also worker thread is just one, and it takes work from the queue as soon as it's ready.
I hit a wall here. When I bombard it with requests at rate higher than it's able to process, as it would happen in reality (see test.sh
) AND let it complete fully, server won't be processing any further requests afterwards. Like I would start test.sh
again and server just hangs.
One interesting thing, as soon as I make work()
not async and for instance, use std:sleep instead of tokio's sleep, program works as expected. Because it's a prototype for the future backend where
work()
will be complex async function, I'd prefer to keep it so.
Can you spot obvious errors?
Would you do it like this, or there is better way of creating such LIFO scheduler?
Thank you
Cargo.toml
[package]
name = "lifo-play"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.8.1"
serde = { version = "1.0.217", features = ["serde_derive"] }
serde_json = "1.0.138"
tokio = { version = "1.43.0", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
src/main.rs
use std::{sync::{Arc, Mutex}, time::Duration};
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, serve, Json, Router};
use serde::{Deserialize, Serialize};
use tokio::{net::TcpListener, sync::oneshot::{channel, Sender}, time};
use tracing::debug;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let state = LifoQueueWorker::default();
let app = Router::new()
.route("/", post(handle))
.with_state(state.clone());
tokio::spawn(async move {
state.clone().run().await;
});
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
debug!("server starting");
serve(listener, app.into_make_service()).await.unwrap();
}
async fn handle(mut s: State<LifoQueueWorker>, Json(req): Json<Req>) -> Result<impl IntoResponse, StatusCode> {
let (tx, rx) = channel::<Resp>();
debug!("working {:?}", req);
s.push((req, tx));
let resp = rx.await.unwrap();
debug!("finished {:?}", resp);
Ok(Json(resp))
}
async fn work(req: WorkReq) {
let resp = Resp {resp: req.0.req };
time::sleep(Duration::from_secs(2)).await;
req.1.send(resp).unwrap();
}
#[derive(Default, Clone)]
struct LifoQueueWorker {
q: Arc<Mutex<Vec<WorkReq>>>
}
impl LifoQueueWorker {
fn push(&mut self, req: WorkReq) {
let mut q = self.q.lock().unwrap();
q.push(req);
if q.len() > 10 {
let dequed_req = q.remove(0);
debug!("too many requests, dequeuing {:?}", dequed_req.0);
let resp = Resp { resp: dequed_req.0.req };
dequed_req.1.send(resp).unwrap();
}
}
fn pop(&mut self) -> Option<WorkReq> {
let mut q = self.q.lock().unwrap();
q.pop()
}
async fn run(&mut self) {
loop {
if let Some(r) = self.pop() {
work(r).await;
}
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Req {
req: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Resp {
resp: String,
}
type WorkReq = (Req, Sender<Resp>);
test.sh
#!/bin/bash
for i in `seq 0 100`
do
p='{"req": "num '${i}'"}'
curl localhost:3000 -H "Content-Type: application/json" -d "${p}" &
sleep 0.2
done