Tokio Semaphore and busy wait

I have two cases where I want to control concurrency

  1. function that writes a Vector to a file asynchonously .
  2. function that spawns off a task in response to a request on a TCP socket

I followed the blog here

This appraoch works for the TCP code but fails for the File write case. This is what happens

Once the # of permits exceeds the system freezes. If the limit is set of 15 the 16th concurrent operation hits a wait but the guard is never released

The only difference i see is that in the first case there is an else while in the second case this is a busy wait. Is this the expected behavior.

Any thoughts on how should this be done. Should there be a sleep in the 2nd case ?

This code works

   loop {
        // Asynchronously wait for an inbound socket.
        let (stream,sock_addr) = listener.accept().await?;
        let p = path_prefix.clone();
        let q = write_every_minute.clone();
        let sem_clone = Arc::clone(&sem);

        tokio::spawn(async move{
            let aq = sem_clone.try_acquire();
            if let Ok(_guard)= aq {
                warn!(target:"metrics","New connection accepted ; thread_id to be spawned ={}; 
                          new client = {}:{}",thread_id,sock_addr.ip(),sock_addr.port());
                if let Err(e) = process(stream, sock_addr, thread_id, p,q).await {
                    error!(target:"metrics","failed to process connection; error = {}", e);
            } else {
                if let Err(e) = handshake_agent(stream,sock_addr,false).await{
                    error!(target:"metrics","Error in rejecting handshake with client {}:{}!"
                } else {
                    error!(target:"metrics","Rejecting client {}:{}: Too many open sockets"


async fn handshake_agent(stream: TcpStream, 
sock_addr:std::net::SocketAddr,can_connect: bool) 
 -> Result<(),Box<dyn Error>>{
    let ready = stream.ready(Interest::WRITABLE).await?;
    let mut err=String::new();
    if ready.is_writable() {
        let mut data = vec![0 as u8;8];
        if can_connect == true {
            data = b"Accept".to_vec();
        } else {
            data = b"Reject".to_vec();
        let ret_val= stream.try_write(&data) ;
        match ret_val {
            Ok(n)=> {
                info!(target:"metrics","Reject Handshake snt to client={}:{}; 
                        Sent {} ;{} bytes",
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                error!(target:"metrics","Reject Handshake: write WouldBlock error: {}; client={}:{}",
                write!(&mut err, "Reject Handshake: write WouldBlock error; {}.",&e);
                return Err(err.into());
            Err(_err) => {
                error!(target:"metrics","Reject Handshake: error: {}",_err);
                write!(&mut err,"Reject Handshake: error; {}",_err);
                return Err(err.into());

This code does not work

tokio::spawn(async move {
loop {
	let aq = write_cap_clone.try_acquire();
	if let Ok(_guard)= aq {
		if let Err(e) = process_1(tid, p, v1).await {
			error!("Failed to spawn a file write ; error = {}", e);
}	 else {
/		*wait for the permit to be available*/
} // End of loop 
});  // End of spawn

Why do you use try_acquire if you want to wait for permits to become available?

If the else with /*wait for the permit to be available*/ does nothing then you're never yielding to the executor, thus blocking the thread.

Thanks. Will try with acquire.await

Replacing with the accquire works

