Hi all,
I've found my issue, but I'm trying to understand why my UDP socket Arc clone() is going crazy and chews up all my memory very, very quickly:
Arc socket clone count: 2
...
Arc socket clone count: 145430
Arc socket clone count: 145431
Arc socket clone count: 145432
Arc socket clone count: 145433
Arc socket clone count: Stopping sip daemon...
145434
Arc socket clone count: 145435
Stopped sentrypeer
Arc socket clone count: 145436
Arc socket clone count: 145437
Arc socket clone count: 145438
Arc socket clone count: 145439
Tokio received a oneshot message to shutdown: "Please shutdown :-)"
Arc socket clone count: 145440
Arc socket clone count: 145421
Arc socket clone count: 145338
Arc socket clone count: 145328
Arc socket clone count: 145312
Arc socket clone count: 145307
Arc socket clone count: 145279
Arc socket clone count: 145242
Arc socket clone count: 145227
Arc socket clone count: 145206
Arc socket clone count: 145192
Arc socket clone count: 145180
Arc socket clone count: 145124
This is the code:
fn handle_udp_connection(
udp_socket: Arc<UdpSocket>,
sentrypeer_config: SentryPeerConfig,
addr: SocketAddr,
) -> impl Future<Output = i32> {
async move {
let mut buf = [0; 1024];
let (bytes_read, peer_addr) = udp_socket.recv_from(&mut buf).await.unwrap();
let debug_mode = (unsafe { *sentrypeer_config.p }).debug_mode;
let verbose_mode = (unsafe { *sentrypeer_config.p }).verbose_mode;
let sip_responsive_mode = (unsafe { *sentrypeer_config.p }).sip_responsive_mode;
if debug_mode || verbose_mode {
eprintln!("Received UDP packet from: {}", peer_addr);
}
if log_sip_packet(
sentrypeer_config,
buf.to_vec(),
bytes_read,
peer_addr,
addr,
"UDP",
) != libc::EXIT_SUCCESS
{
eprintln!("Failed to log SIP packet");
}
if debug_mode || verbose_mode {
eprintln!(
"Received: {:?}",
String::from_utf8_lossy(&buf[..bytes_read])
);
}
if sip_responsive_mode {
udp_socket.send_to(SIP_PACKET, peer_addr).await.unwrap();
}
libc::EXIT_SUCCESS
}
}
#[no_mangle]
pub(crate) extern "C" fn run_sip_server(sentrypeer_c_config: *mut sentrypeer_config) -> i32 {
// Assert we're not getting a null pointer
assert!(!sentrypeer_c_config.is_null());
let rt = tokio::runtime::Builder::new_multi_thread()
.thread_name("tls_tokio_runtime")
.enable_all()
.build()
.unwrap();
let handle = rt.handle().clone();
// Create a oneshot channel to send a message to tokio runtime to shutdown
let (tx, rx) = oneshot::channel::<String>();
let sentrypeer_config = SentryPeerConfig {
p: sentrypeer_c_config,
};
// Launch our Tokio runtime from a new thread so we can exit this function
let thread_builder = std::thread::Builder::new().name("sentrypeer_std_thread".to_string());
let debug_mode = (unsafe { *sentrypeer_config.p }).debug_mode;
let verbose_mode = (unsafe { *sentrypeer_config.p }).verbose_mode;
let _std_thread_handle = thread_builder.spawn(move || {
handle.block_on(async move {
let config = load_all_configs(sentrypeer_config).expect("Failed to load all configs");
// UDP
let udp_socket = UdpSocket::bind("0.0.0.0:5060")
.await
.expect("UDP: Failed to bind to address");
let addr = udp_socket.local_addr().unwrap();
// https://github.com/tokio-rs/tokio/discussions/3755#discussioncomment-702928
// UdpSocket docs: This type does not provide a split method, because
// this functionality can be achieved by instead wrapping the socket
// in an [Arc]
let arc_socket = Arc::new(udp_socket);
if debug_mode || verbose_mode {
eprintln!("Listening for incoming UDP connections...");
}
tokio::spawn(async move {
loop {
let socket = arc_socket.clone();
eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));
tokio::spawn(async move {
if handle_udp_connection(socket, sentrypeer_config, addr).await
!= libc::EXIT_SUCCESS
{
eprintln!("Failed to handle UDP connection");
}
});
}
});
match rx.await {
Ok(msg) => {
if debug_mode || verbose_mode {
eprintln!("Tokio received a oneshot message to shutdown: {:?}", msg);
}
// https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.shutdown_background
rt.shutdown_background();
libc::EXIT_SUCCESS
}
Err(_) => {
eprintln!("Failed to receive message to shutdown.");
libc::EXIT_FAILURE
}
}
});
});
// Set the pointer to the oneshot channel
// TODO: Rename this in config.h
unsafe {
(*sentrypeer_c_config).sip_tls_channel = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
}
libc::EXIT_SUCCESS
}
if I move my handle_udp_connection().await
outside of the inner tokio::spawn
, e.g. delete that spawn, just above my match rx.await
, all is fine. Arc count is always 2. The original code is how I handle TCP and TLS, albeit without the Arc clone.
Why? No traffic, just at rest. I know I'm being dumb here and need a second pair of eyes.
Thanks,
Gavin.