Tokio, UDP and Arc socket clone count: 145206

Hi all,

I've found my issue, but I'm trying to understand why my UDP socket Arc clone() is going crazy and chews up all my memory very, very quickly:

Arc socket clone count: 2
...
Arc socket clone count: 145430
Arc socket clone count: 145431
Arc socket clone count: 145432
Arc socket clone count: 145433
Arc socket clone count: Stopping sip daemon...
145434
Arc socket clone count: 145435
Stopped sentrypeer
Arc socket clone count: 145436
Arc socket clone count: 145437
Arc socket clone count: 145438
Arc socket clone count: 145439
Tokio received a oneshot message to shutdown: "Please shutdown :-)"
Arc socket clone count: 145440
Arc socket clone count: 145421
Arc socket clone count: 145338
Arc socket clone count: 145328
Arc socket clone count: 145312
Arc socket clone count: 145307
Arc socket clone count: 145279
Arc socket clone count: 145242
Arc socket clone count: 145227
Arc socket clone count: 145206
Arc socket clone count: 145192
Arc socket clone count: 145180
Arc socket clone count: 145124

This is the code:

tokio-udp.rs:

fn handle_udp_connection(
    udp_socket: Arc<UdpSocket>,
    sentrypeer_config: SentryPeerConfig,
    addr: SocketAddr,
) -> impl Future<Output = i32> {
    async move {
        let mut buf = [0; 1024];
        let (bytes_read, peer_addr) = udp_socket.recv_from(&mut buf).await.unwrap();
        let debug_mode = (unsafe { *sentrypeer_config.p }).debug_mode;
        let verbose_mode = (unsafe { *sentrypeer_config.p }).verbose_mode;
        let sip_responsive_mode = (unsafe { *sentrypeer_config.p }).sip_responsive_mode;

        if debug_mode || verbose_mode {
            eprintln!("Received UDP packet from: {}", peer_addr);
        }

        if log_sip_packet(
            sentrypeer_config,
            buf.to_vec(),
            bytes_read,
            peer_addr,
            addr,
            "UDP",
        ) != libc::EXIT_SUCCESS
        {
            eprintln!("Failed to log SIP packet");
        }

        if debug_mode || verbose_mode {
            eprintln!(
                "Received: {:?}",
                String::from_utf8_lossy(&buf[..bytes_read])
            );
        }

        if sip_responsive_mode {
            udp_socket.send_to(SIP_PACKET, peer_addr).await.unwrap();
        }

        libc::EXIT_SUCCESS
    }
}

#[no_mangle]
pub(crate) extern "C" fn run_sip_server(sentrypeer_c_config: *mut sentrypeer_config) -> i32 {
    // Assert we're not getting a null pointer
    assert!(!sentrypeer_c_config.is_null());

    let rt = tokio::runtime::Builder::new_multi_thread()
        .thread_name("tls_tokio_runtime")
        .enable_all()
        .build()
        .unwrap();
    let handle = rt.handle().clone();

    // Create a oneshot channel to send a message to tokio runtime to shutdown
    let (tx, rx) = oneshot::channel::<String>();

    let sentrypeer_config = SentryPeerConfig {
        p: sentrypeer_c_config,
    };

    // Launch our Tokio runtime from a new thread so we can exit this function
    let thread_builder = std::thread::Builder::new().name("sentrypeer_std_thread".to_string());

    let debug_mode = (unsafe { *sentrypeer_config.p }).debug_mode;
    let verbose_mode = (unsafe { *sentrypeer_config.p }).verbose_mode;

    let _std_thread_handle = thread_builder.spawn(move || {
        handle.block_on(async move {
            let config = load_all_configs(sentrypeer_config).expect("Failed to load all configs");

            // UDP
            let udp_socket = UdpSocket::bind("0.0.0.0:5060")
                .await
                .expect("UDP: Failed to bind to address");
            let addr = udp_socket.local_addr().unwrap();

            // https://github.com/tokio-rs/tokio/discussions/3755#discussioncomment-702928
            // UdpSocket docs: This type does not provide a split method, because
            // this functionality can be achieved by instead wrapping the socket
            // in an [Arc]
            let arc_socket = Arc::new(udp_socket);

            if debug_mode || verbose_mode {
                eprintln!("Listening for incoming UDP connections...");
            }

            tokio::spawn(async move {
                loop {
                    let socket = arc_socket.clone();
                    eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));

                    tokio::spawn(async move {
                        if handle_udp_connection(socket, sentrypeer_config, addr).await
                            != libc::EXIT_SUCCESS
                        {
                            eprintln!("Failed to handle UDP connection");
                        }
                    });
                }
            });

            match rx.await {
                Ok(msg) => {
                    if debug_mode || verbose_mode {
                        eprintln!("Tokio received a oneshot message to shutdown: {:?}", msg);
                    }
                    // https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.shutdown_background
                    rt.shutdown_background();
                    libc::EXIT_SUCCESS
                }
                Err(_) => {
                    eprintln!("Failed to receive message to shutdown.");
                    libc::EXIT_FAILURE
                }
            }
        });
    });

    // Set the pointer to the oneshot channel
    // TODO: Rename this in config.h
    unsafe {
        (*sentrypeer_c_config).sip_tls_channel = Box::into_raw(Box::new(tx)) as *mut libc::c_void;
    }

    libc::EXIT_SUCCESS
}

if I move my handle_udp_connection().await outside of the inner tokio::spawn, e.g. delete that spawn, just above my match rx.await, all is fine. Arc count is always 2. The original code is how I handle TCP and TLS, albeit without the Arc clone.

Why? No traffic, just at rest. I know I'm being dumb here and need a second pair of eyes.

Thanks,
Gavin.

Switching it to this version works OK too:

            tokio::spawn(async move {
                loop {
                    let socket = arc_socket.clone();
                    eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));

                    let handle = tokio::spawn(async move {
                        if handle_udp_connection(socket, sentrypeer_config, addr).await
                            != libc::EXIT_SUCCESS
                        {
                            eprintln!("Failed to handle UDP connection");
                        }
                    });                    
                    let _ = handle.await;                    
                }
            });

so I guess rustc can figure out the Arc isn't needed anymore and doesn't grow it?

you are cloning the Arc in a infinite loop, what do you expect ?

But it should do the same without the inner tokio::spawn, but it does not.

I guess it's not going out of scope unless I use the handle way or remove tokio::spawn completely.

no, it shouldn't, not if you ignored (i.e. not await) the returned task handle.

your understanding of tokio::spawn() is fundamentally wrong.

Thanks for your reply @nerditation

So in the 1st version:

            tokio::spawn(async move {
                loop {
                    let socket = arc_socket.clone();
                    eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));

                    tokio::spawn(async move {
                        if handle_udp_connection(socket, sentrypeer_config, addr).await
                            != libc::EXIT_SUCCESS
                        {
                            eprintln!("Failed to handle UDP connection");
                        }
                    });                             
                }
            });

arc count is increments fast and memory runs out.

in this 2nd version, arc remains the same:

            tokio::spawn(async move {
                loop {
                    let socket = arc_socket.clone();
                    eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));

                     if handle_udp_connection(socket, sentrypeer_config, addr).await
                          != libc::EXIT_SUCCESS
                     {
                          eprintln!("Failed to handle UDP connection");
                      }
                }
            });

both are within an infinite loop, with the same single .clone()

and with this 3rd version, where I await the spawned task, Arc count is OK:

            tokio::spawn(async move {
                loop {
                    let socket = arc_socket.clone();
                    eprintln!("Arc socket clone count: {}", Arc::strong_count(&socket));

                    // Handle needed so our Arc count doesn't go wild 
                    // See https://users.rust-lang.org/t/tokio-udp-and-arc-socket-clone-count-145206/122457/2
                    let handle = tokio::spawn(async move {
                        if handle_udp_connection(socket, sentrypeer_config, addr).await
                            != libc::EXIT_SUCCESS
                        {
                            eprintln!("Failed to handle UDP connection");
                        }
                    });                   
                    let _ = handle.await;                    
                }
            });

Is this because in the 2nd and 3rd version, I .await; once on the handle_udp_connection() and then on the other tokio::spawn. That then means the spawned task is finished and the that udp_socket clone goes out of scope and is released on the next loop interation?

In a lot of the code I've seen you don't use a handle and await a task, it just runs.

Thanks.

correct.

spawn() just put the task (i.e. Future + Send + 'static) into the scheduler's queue and return immediately, it does not wait for the task to finish, but returns a "handle" to the newly spawned task.

this handle will resolve when the task finishes, so you can manually await this handle. but if you didn't, you basically get a "fire-and-forget" behavior.

note, usually, there's no point to spawn a future as a task but then immediately await the task (as in your 3rd version), you can just await the future. [1]


  1. technically, these two cases are not exactly equivalent, for instance, a spawned task can be canceled without the parant task being canceled, but let's not worry about that ↩ī¸Ž

2 Likes

The difference is whether you wait for handle_udp_connection to finish before continuing to the next iteration of the loop.

After all, the purpose of tokio::spawn is to start a background task without waiting for its operation. In the 3rd version, you use the handle to wait for the background task.

1 Like

Thanks @alice and others in Tokio discord helping me understand I'm not awaiting anything in my loop like I am with TCP and TLS, so millions of tasks are getting spawned, waiting for something :slight_smile:

My 1st version was based on this:

Thanks.

Thanks for the explanation again too!

Which only has one spawn too!

@nerditation Just had a thought about about removing the tokio::spawn in the loop. If I do that, the outer spawn will run the loop awaiting a connection, then handle it in the handle_xxx_connection. If I'm not spawning that bit, I'm only handling one client at a time, right?

in your example, yes, because the way you use spwan() and the way your handle_connection() is written.

the key to serve multiple clients concurrently is NOT having to wait for a previous request to finish before answering next requests. this is typically done in a loop with the following structure:

loop {
    let request = wait_for_next_client_to_connect(server_socket);
    spawn(handler(request));
}

the key difference in your example is that you did NOT wait for the next client to arrive, before spawning a new task to handle it, so you end up with infinitely many tasks.

for connection based network transportation such as TCP or QUIC, each client will get a dedicated connection, so the main loop use accept() to wait for the next client, and then pass the returned connection to the request handler, where the actual data can be read and parsed (asynchronously from the main loop).

however, UDP isn't connection based, and there's no equivalent operation for accept(), the only thing the main loop can wait for is receive() of data packet, and that's how a typical UDP based server should do:

loop {
    // client_id is usually derived from the client socket address
    let (client_id, request_packet) = wait_to_receive_next_packet(server_socket);
    spawn(handler(client_id, request_packet, server_socket));
}

note in this example, the server_socket is passed to the handler, it is meant to send reply packets to the clients, not to receive new request packets.

note also, this example is only suitable for use cases where each client request is a single UDP packet, e.g. the traditional DNS protocol. for advanced use cases, you must "de-multiplex" the incoming packets in the main loop, e.g. based on the client id, and then use some form of "streams", such as async channels, to send the packets to handlers (which can be pre-spawned tasks or tasks that are spawned on demand). see this udp-server crate for an example implementation.

2 Likes

Thanks for the detailed reply @nerditation I hadn't actually thought about more than a single packet reply at this point as I've never needed it. When I do for UDP, I'll have a read of the example you linked to. Much appreciated!

I've just cleaned up my blocking UDP logic

and tested with a tokio sleep. No more blocking!