LIFO queue with tokio & axum hangs when idle

Hello!

I am implementing last-in first-out scheduler/queue/server, that would accept incoming request, put into queue for processing and return result when ready. Queue depth is limited and I want most recent requests to be processed first.

This is naive implementation with axum, queue vector held in arc::mutex and oneshot channels for giving responses back. Also worker thread is just one, and it takes work from the queue as soon as it's ready.

I hit a wall here. When I bombard it with requests at rate higher than it's able to process, as it would happen in reality (see test.sh) AND let it complete fully, server won't be processing any further requests afterwards. Like I would start test.sh again and server just hangs.

One interesting thing, as soon as I make work() not async and for instance, use std::thread:sleep instead of tokio's sleep, program works as expected. Because it's a prototype for the future backend where work() will be complex async function, I'd prefer to keep it so.

Can you spot obvious errors?
Would you do it like this, or there is better way of creating such LIFO scheduler?
Thank you

Cargo.toml

[package]
name = "lifo-play"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.8.1"
serde = { version = "1.0.217", features = ["serde_derive"] }
serde_json = "1.0.138"
tokio = { version = "1.43.0", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

src/main.rs

use std::{sync::{Arc, Mutex}, time::Duration};
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, serve, Json, Router};
use serde::{Deserialize, Serialize};
use tokio::{net::TcpListener, sync::oneshot::{channel, Sender}, time};
use tracing::debug;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    let state = LifoQueueWorker::default();

    let app = Router::new()
        .route("/", post(handle))
        .with_state(state.clone());

    tokio::spawn(async move {
        state.clone().run().await;
    });

    let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();

    debug!("server starting");
    serve(listener, app.into_make_service()).await.unwrap();
}

async fn handle(mut s: State<LifoQueueWorker>, Json(req): Json<Req>) -> Result<impl IntoResponse, StatusCode> {
    let (tx, rx) = channel::<Resp>();
    debug!("working  {:?}", req);
    s.push((req, tx));
    let resp = rx.await.unwrap();
    debug!("finished {:?}", resp);
    Ok(Json(resp))
}

async fn work(req: WorkReq) {
    let resp = Resp {resp: req.0.req };
    time::sleep(Duration::from_secs(2)).await;
    req.1.send(resp).unwrap();
}

#[derive(Default, Clone)]
struct LifoQueueWorker {
    q: Arc<Mutex<Vec<WorkReq>>>
}

impl LifoQueueWorker {
    fn push(&mut self, req: WorkReq) {
        let mut q = self.q.lock().unwrap();
        q.push(req);
        if q.len() > 10 {
            let dequed_req = q.remove(0);
            debug!("too many requests, dequeuing {:?}", dequed_req.0);
            let resp = Resp { resp: dequed_req.0.req };
            dequed_req.1.send(resp).unwrap();
        }
    }

    fn pop(&mut self) -> Option<WorkReq> {
        let mut q = self.q.lock().unwrap();
        q.pop()
    }

    async fn run(&mut self) {
        loop {
            if let Some(r) = self.pop() {
                work(r).await;
            }
        }
    }
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Req {
    req: String,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Resp {
    resp: String,
}

type WorkReq = (Req, Sender<Resp>);

test.sh

#!/bin/bash

for i in `seq 0 100`
do
    p='{"req": "num '${i}'"}'
    curl localhost:3000 -H "Content-Type: application/json" -d "${p}" &
    sleep 0.2
done

Think about what happens when the queue is empty and the if is false. Under that condition, this code is equivalent in scheduling to:

    async fn run(&mut self) {
        loop {}
    }

which runs an infinite loop forever without suspending (yielding to the async executor). The simplest possible fix is to yield explicitly:

    async fn run(&mut self) {
        loop {
            if let Some(r) = self.pop() {
                work(r).await;
            } else {
                tokio::task::yield_now().await;
            }
        }
    }

But it would be better to arrange so that the task suspends until it actually has work to do. I don’t know what would be best for that; if not for the LIFO requirement I would suggest using an async channel.

2 Likes

That's great, it helps, thank you!
Indeed, now I see quite a few wasted cpu cycles in this loop, need to think about that..
I am curious why wouldn't it loop forever from the get go, but first batch of requests is successful.