Tokio Semaphore and busy wait

Hi
I have two cases where I want to control concurrency

  1. function that writes a Vector to a file asynchonously .
  2. function that spawns off a task in response to a request on a TCP socket

I followed the blog here

This appraoch works for the TCP code but fails for the File write case. This is what happens

Once the # of permits exceeds the system freezes. If the limit is set of 15 the 16th concurrent operation hits a wait but the guard is never released

The only difference i see is that in the first case there is an else while in the second case this is a busy wait. Is this the expected behavior.

Any thoughts on how should this be done. Should there be a sleep in the 2nd case ?

This code works

   loop {
        // Asynchronously wait for an inbound socket.
        let (stream,sock_addr) = listener.accept().await?;
        let p = path_prefix.clone();
        let q = write_every_minute.clone();
        let sem_clone = Arc::clone(&sem);

        tokio::spawn(async move{
            let aq = sem_clone.try_acquire();
            
            if let Ok(_guard)= aq {
                warn!(target:"metrics","New connection accepted ; thread_id to be spawned ={}; 
                          new client = {}:{}",thread_id,sock_addr.ip(),sock_addr.port());
                if let Err(e) = process(stream, sock_addr, thread_id, p,q).await {
                    error!(target:"metrics","failed to process connection; error = {}", e);
                }
            } else {
                if let Err(e) = handshake_agent(stream,sock_addr,false).await{
                    error!(target:"metrics","Error in rejecting handshake with client {}:{}!"
                    ,sock_addr.ip(),sock_addr.port());
                } else {
                    error!(target:"metrics","Rejecting client {}:{}: Too many open sockets"
                    ,sock_addr.ip(),sock_addr.port());
                }
            }

        });
    }

async fn handshake_agent(stream: TcpStream, 
sock_addr:std::net::SocketAddr,can_connect: bool) 
 -> Result<(),Box<dyn Error>>{
    let ready = stream.ready(Interest::WRITABLE).await?;
    let mut err=String::new();
    if ready.is_writable() {
        let mut data = vec![0 as u8;8];
        if can_connect == true {
            data = b"Accept".to_vec();
        } else {
            data = b"Reject".to_vec();
        }
        let ret_val= stream.try_write(&data) ;
        match ret_val {
            Ok(n)=> {
                info!(target:"metrics","Reject Handshake snt to client={}:{}; 
                        Sent {} ;{} bytes",
                      sock_addr.ip(),sock_addr.port(),from_utf8(&data[0..n]).unwrap(),n);
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                error!(target:"metrics","Reject Handshake: write WouldBlock error: {}; client={}:{}",
                       e,sock_addr.ip(),sock_addr.port());
                write!(&mut err, "Reject Handshake: write WouldBlock error; {}.",&e);
                return Err(err.into());
            }
            Err(_err) => {
                error!(target:"metrics","Reject Handshake: error: {}",_err);
                write!(&mut err,"Reject Handshake: error; {}",_err);
                return Err(err.into());
            }
        }
    }
    err.clear();
    
Ok(())
}


This code does not work

tokio::spawn(async move {
loop {
	let aq = write_cap_clone.try_acquire();
	if let Ok(_guard)= aq {
		warn!(target:"metrics","process_id={},thread_id={},write_Cap={}"
		,p,tid,write_cap_clone.available_permits());
		if let Err(e) = process_1(tid, p, v1).await {
			error!("Failed to spawn a file write ; error = {}", e);
		}	
		break;
}	 else {
/		*wait for the permit to be available*/
	}
} // End of loop 
});  // End of spawn

Why do you use try_acquire if you want to wait for permits to become available?

If the else with /*wait for the permit to be available*/ does nothing then you're never yielding to the executor, thus blocking the thread.

Thanks. Will try with acquire.await

Replacing with the accquire works

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.