Hi
I have two cases where I want to control concurrency
- function that writes a Vector to a file asynchonously .
- function that spawns off a task in response to a request on a TCP socket
I followed the blog here
This appraoch works for the TCP code but fails for the File write case. This is what happens
Once the # of permits exceeds the system freezes. If the limit is set of 15 the 16th concurrent operation hits a wait but the guard is never released
The only difference i see is that in the first case there is an else while in the second case this is a busy wait. Is this the expected behavior.
Any thoughts on how should this be done. Should there be a sleep in the 2nd case ?
This code works
loop {
// Asynchronously wait for an inbound socket.
let (stream,sock_addr) = listener.accept().await?;
let p = path_prefix.clone();
let q = write_every_minute.clone();
let sem_clone = Arc::clone(&sem);
tokio::spawn(async move{
let aq = sem_clone.try_acquire();
if let Ok(_guard)= aq {
warn!(target:"metrics","New connection accepted ; thread_id to be spawned ={};
new client = {}:{}",thread_id,sock_addr.ip(),sock_addr.port());
if let Err(e) = process(stream, sock_addr, thread_id, p,q).await {
error!(target:"metrics","failed to process connection; error = {}", e);
}
} else {
if let Err(e) = handshake_agent(stream,sock_addr,false).await{
error!(target:"metrics","Error in rejecting handshake with client {}:{}!"
,sock_addr.ip(),sock_addr.port());
} else {
error!(target:"metrics","Rejecting client {}:{}: Too many open sockets"
,sock_addr.ip(),sock_addr.port());
}
}
});
}
async fn handshake_agent(stream: TcpStream,
sock_addr:std::net::SocketAddr,can_connect: bool)
-> Result<(),Box<dyn Error>>{
let ready = stream.ready(Interest::WRITABLE).await?;
let mut err=String::new();
if ready.is_writable() {
let mut data = vec![0 as u8;8];
if can_connect == true {
data = b"Accept".to_vec();
} else {
data = b"Reject".to_vec();
}
let ret_val= stream.try_write(&data) ;
match ret_val {
Ok(n)=> {
info!(target:"metrics","Reject Handshake snt to client={}:{};
Sent {} ;{} bytes",
sock_addr.ip(),sock_addr.port(),from_utf8(&data[0..n]).unwrap(),n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
error!(target:"metrics","Reject Handshake: write WouldBlock error: {}; client={}:{}",
e,sock_addr.ip(),sock_addr.port());
write!(&mut err, "Reject Handshake: write WouldBlock error; {}.",&e);
return Err(err.into());
}
Err(_err) => {
error!(target:"metrics","Reject Handshake: error: {}",_err);
write!(&mut err,"Reject Handshake: error; {}",_err);
return Err(err.into());
}
}
}
err.clear();
Ok(())
}
This code does not work
tokio::spawn(async move {
loop {
let aq = write_cap_clone.try_acquire();
if let Ok(_guard)= aq {
warn!(target:"metrics","process_id={},thread_id={},write_Cap={}"
,p,tid,write_cap_clone.available_permits());
if let Err(e) = process_1(tid, p, v1).await {
error!("Failed to spawn a file write ; error = {}", e);
}
break;
} else {
/ *wait for the permit to be available*/
}
} // End of loop
}); // End of spawn