Freezing in single-threaded !Send session

Over the past year and a half, I've been working on a layer-4 distributed post-quantum networking protocol. Thanks to rust, it is highly efficient and on-par with cloudflare's QUIC. Nearing completion now, I have finally been able to debug some of the larger parts of the program. One such part involves the session-level module.

Each session gets spawned via tokio's spawn_local to ensure that each session is single-threaded.

The SessionManager creates a Session, and then wraps it inside an Rc<RefCell<Session>>. The SessionManager then places a clone inside a hashmap correlated to an IpAddr or u64 CID. The original version then gets spawned_locally as described above. As important note is that, when constructing the Session, a cloned-version of the SessionManager gets passed (which itself is wrapped in a Rc<RefCell<SessionManager>>>.

The ephemeral diffie-helman key exchange works (registration phase), and then the login stage works. After the login stage, both nodes play ping-pong with keep-alives. In RAM, the program uses about 3-5mb of RAM total. Then, mysteriously after 3 minutes or so, the client-node freezes, and the memory usage spikes to 130mb. The server node does not do this, and continues to function as expected.

A note about the purpose of having a SessionManager inside a Session: Sometimes, I need to remove the Session from the hashmap inside the SessionManager in order to process a disconnect. From a birds-eye view, this is what's happening with the references:

SessionManager spawns_locally a Session. Session has a SessionManager Rc<RefCell inside it. Sometimes, the Session calls the SessionManager to process a disconnect, of which requires exclusive &mut access through the RefCell. It seems to work, but I wanted to give you an understanding of the "topology" of the program's architecture.

Also: the SessionManager also runs on a single thread, including the Server abstraction above it. The protocol, at the layer-4 level, is all single-threaded.

Intuitively, might any of you know why my program spontaneously freezes? I know it's difficult without seeing code, but perhaps someone knows of a pitfall of setting up the program like I have.

async basically means cooperative multithreading, even when everything happens on a single OS thread. Code between await points will happen atomically, but more-or-less anything can happen when await yields control back to the scheduler, including any of the classic concurrency issues like race conditions and deadlocks— that’s where I would start looking.

1 Like

This is what I've suspected too. I've fixed several freezes caused by not dropping a RefMut<'_, HdpSession> before awaiting a longer task, such as a 200ms async sleep.

The other possibility, of course, is that there’s some code in the client that gets itself into an infinite loop that never awaits, which is the problem preemptive multithreading was invented to fix.

Perhaps an easy way to fix it is to switch from Rc<RefCells to Arc<RwLocks. However, I'll only do this if necessary because of the performance penalties

Perhaps you can find a way to do this during debug, to enable you to instrument and localize the fault. Then switch it back to the more-performant single-thread version before release.

1 Like

Okay, I have played around with disabling certain closures to see which one might be the culprit. When I disable this closure, the program works:

/// This will handle a keep alive packet. It will automatically send a keep packet after it sleeps for a period of time
pub async fn handle_keep_alive_packet(session: &HdpSession, header: &LayoutVerified<&[u8], HdpHeader>, _payload: &[u8]) -> PrimaryProcessorResult {
    debug_assert_eq!(packet_flags::cmd::primary::KEEP_ALIVE, header.cmd_primary);
    let mut session = session.inner.try_borrow_mut().expect("[Keep-alive] Attempted to borrow session mut, but is taken");
    if session.state != SessionState::Connected {
        log::error!("Keep alive received, but session not connected. Dropping packet");
        return PrimaryProcessorResult::Void;
    }

    let current_timestamp_ns = session.time_tracker.get_global_time_ns();
    if let Some(ref cnac) = session.cnac.clone() {
        if let Ok(ref latest_drill) = validation::keep_alive::validate_keep_alive(header, cnac) {
            let mut state_container = session.state_container.try_borrow_mut().expect("KA not able to borrow state container");
            if state_container.on_keep_alive_received(header.timestamp.get(), current_timestamp_ns) {
                // wait some time
                std::mem::drop(state_container);
                let time_start_ns = session.time_tracker.get_global_time_ns();
                let time_prev = Instant::now();
                std::mem::drop(session);
                async_std::task::sleep(Duration::from_millis(KEEP_ALIVE_INTERVAL_MS)).await;
                let timestamp = time_start_ns + time_prev.elapsed().as_nanos() as i64;
                PrimaryProcessorResult::ReplyToSender(hdp_packet_crafter::keep_alive::craft_keep_alive_packet(latest_drill, timestamp))
            } else {
                log::trace!("Invalid KEEP_ALIVE window; expired");
                session.session_manager.clear_session(session.implicated_cid.unwrap());
                PrimaryProcessorResult::EndSession("Keep alive arrived too late")
            }
        } else {
            // bad packet
            log::info!("Keep alive invalid!");
            PrimaryProcessorResult::Void
        }
    } else {
        trace!("Invalid load state. CNAC is missing. Signalling for shutdown");
        PrimaryProcessorResult::Void
    }
}

When this closure isn't used, the program works. Is there anything sketchy in here that may be causing a freeze?

Solved. Wow. Turns out, I was using the RefMut's correctly. The problem was in another part of the program entirely: the BytesCodec.

The buffer for the Decoder would run into zero capacity, and then it would freeze. The default BytesCodec from Tokio doesn't reserve additional buffer space. Something for the developers to look into

2 Likes

@alice, you're working on tokio, right? My humble suggestion to the developers of tokio would be to add a BUFFER_CAPACITY input for the BytesCodec, and reserve that amount once the capacity of the Decoder's buffer approaches zero. That bug took almost 24 hours to find, and was looking in my program, when really it was in the codec all along.

I have not followed this thread closely, but are you using BytesCodec directly instead of through FramedRead / Framed? The FramedRead utility should reserve more capacity in the buffer if necessary.

This is my current code:

let framed = Framed::new(tcp_stream, crate::hdp::codec::BytesCodec::new(CODEC_BUFFER_CAPACITY));

This was the code before without a modification of the codec

let framed = Framed::new(tcp_stream, crate::hdp::codec::BytesCodec::new());

So, yes, I used Framed to construct the stream.

And the fix to the decoder:

impl Decoder for BytesCodec {
    type Item = Bytes;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, io::Error> {
        if !buf.is_empty() {
            let len = buf.len();
            let ret = buf.split_to(len).freeze();
            if buf.capacity() < CODEC_MIN_BUFFER {
                buf.reserve(self.0 - buf.capacity());
            }
            Ok(Some(ret))
        } else {
            Ok(None)
        }
    }
}

Can you submit a bug report to the Tokio repository? Preferably with a test case that fails due to this problem.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.