About optimization Quic stream (Need help)

About libp2p_stream and quic stream

  1. First of all, I just learned rust and libp2p not long ago. I plan to use libp2p to build a collaboration tool. The problem I encounter now is that the delay between my two computers in the LAN is 1ms. When I use libp2p to connect ping or transmit udp data packets, the delay is 20-30ms, which is unacceptable. I want to reduce the delay to about 5ms. I understand that using libp2p itself has performance loss. But 20-30ms is too high.
  2. The problem is that I have several ideas now:
    I use opentelemetry to track performance analysis,
pub async fn read_packet(stream: &mut Stream) -> Result<(TLVProtocol, Vec<u8>)> {
// Total time starts
let total_start = Instant::now();
info!(target: "app::net::p2p::stream::packet", "Stream Start reading data");

// 1. Create BufReader
let buf_reader_start = Instant::now();
let mut buffered = BufReader::with_capacity(16384, stream); // 16KB buffer
let buf_reader_time = buf_reader_start.elapsed();
info!(target: "app::sys::performance", "1. BufReader creation time: {:?}", buf_reader_time);

// 2. Buffer allocation
let alloc_start = Instant::now();
let mut buffer = BytesMut::with_capacity(4096);
let alloc_time = alloc_start.elapsed();
info!(target: "app::sys::performance", "2. Buffer allocation time: {:?}", alloc_time);

// 3. Header resize
let resize_header_start = Instant::now();
buffer.resize(4, 0);
let resize_header_time = resize_header_start.elapsed();
info!(target: "app::sys::performance", "3. Header adjustment time: {:?}", resize_header_time);

// 4. Header reading
let read_header_start = Instant::now();
match buffered.read_exact(&mut buffer[..4]).await {

match buffered.read_exact(&mut buffer[..4]).await The delay of this sentence will fluctuate between 10ms-100ms.

My stream is built based on libp2p_Stream lib. Here is to get the stream controller.

stream_result = incoming_streams.next() => {
match stream_result {
Some((peer, stream)) => {

Mode::Dial(_) => {
match control.open_stream(open_peer_id, tlv).await {

Then I operate the stream to transmit my application data between two nodes.

My question now is, in addition to using libp2p_Stream lib to build custom streams to transmit data, is there a more efficient way to transmit data, such as based on quic_stream.

Because there are too few tutorials related to libp2p, and most of the docs.rs do not have examples, I don’t know what to do now. Is to find another stream to replace my original libp2p_Stream lib, I'd better focus on solving the match buffered.read_exact(&mut buffer[..4]).await problem, which can help me solve the latency problem.

Another problem is that I use iperf3 to test the performance of my two nodes. When they are directly connected, they can reach 30-40Mbps, but when using libp2p, they are only 2Mbps, and packets will be lost as long as they exceed 2M.

Based on the above reasons, I have been wondering if my stream construction method is incorrect.

Below is my Behaviour build and swarm build


pub fn new(
        keypair: &libp2p::identity::Keypair,
        relay_behaviour: relay::client::Behaviour,
    ) -> Result<Self> {
        
        let message_id_fn = |message: &gossipsub::Message| {
            let mut s = DefaultHasher::new();
            message.data.hash(&mut s);
            gossipsub::MessageId::from(s.finish().to_string())
        };
        
        let gossipsub_config = gossipsub::ConfigBuilder::default()
            .heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the logs space
            .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
            .message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
            .build()
            .map_err(|msg| std::io::Error::new(std::io::ErrorKind::Other, msg))?; // Temporary hack because "build" does not return a proper "std::error::Error".
        
        let gossipsub = gossipsub::Behaviour::new(
            gossipsub::MessageAuthenticity::Signed(keypair.clone()),
            gossipsub_config,
        )
        .map_err(|e| anyhow::anyhow!("创建 gossipsub 网络行为错误: {}", e))?;

        let relay_client = relay_behaviour;
        let ping = ping::Behaviour::new(ping::Config::new());
        let identify = identify::Behaviour::new(identify::Config::new(
            GlobalState::global_config().get_global_identify_name()?,
            keypair.public(),
        ));
        let dcutr = dcutr::Behaviour::new(keypair.public().to_peer_id());

        let mdns =
            mdns::tokio::Behaviour::new(mdns::Config::default(), keypair.public().to_peer_id())?;

        let stream = libp2p_stream::Behaviour::new();
        
        Ok(ClientBehaviour {
            relay_client,
            ping,
            identify,
            dcutr,
            gossipsub,
            mdns,
            stream,
        })
    }




let mut swarm =
        libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
            .with_tokio()
            .with_tcp(
                tcp::Config::default().nodelay(true),
                noise::Config::new,
                yamux::Config::default,
            )?
            // .with_quic_config(|mut config| {
            //     config.keep_alive_interval = Duration::from_secs(5);
            //     config.max_idle_timeout = Duration::from_millis(10_000);
            //     config.max_concurrent_stream_limit = 256;
            //     // 设置其他字段...
            //     config
            // })
            .with_quic_config(|mut config| {
                // 禁用对 QUIC 协议草案 29 版本的支持
                // 只使用 QUIC 版本 1,保证兼容性和稳定性
                config.support_draft_29 = false;

                // 设置初始握手的超时时间为 5 秒
                // 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
                config.handshake_timeout = Duration::from_secs(10);

                // 设置最大空闲超时时间为 10,000 毫秒(10 秒)
                // 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
                config.max_idle_timeout = 10 * 1000;

                // 设置最大并发双向流的数量为 256
                // 限制单个连接可以同时打开的流的数量,防止资源耗尽
                config.max_concurrent_stream_limit = 1024;

                // 设置保活包发送间隔为 5 秒
                // 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
                config.keep_alive_interval = Duration::from_secs(5);

                // 设置单个连接的最大数据量为 50 MB
                // 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
                config.max_connection_data = 100_000_000;

                // 设置单个流的最大数据量为 30 MB
                // 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
                config.max_stream_data = 100_000_000;

                config

            })
            .with_dns()?
            .with_relay_client(noise::Config::new, yamux::Config::default)?
            .with_behaviour(|keypair, relay_behaviour| {
                let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
                Ok(behaviour)
            })?
            .with_swarm_config(|c| {
                c.with_idle_connection_timeout(Duration::from_secs(60))
                    .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
                    .with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
            })
            .build();

    swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;


let mut swarm =
        libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
            .with_tokio()
            .with_tcp(
                tcp::Config::default().nodelay(true),
                noise::Config::new,
                yamux::Config::default,
            )?
            // .with_quic_config(|mut config| {
            //     config.keep_alive_interval = Duration::from_secs(5);
            //     config.max_idle_timeout = Duration::from_millis(10_000);
            //     config.max_concurrent_stream_limit = 256;
            //     // 设置其他字段...
            //     config
            // })
            .with_quic_config(|mut config| {
                // 禁用对 QUIC 协议草案 29 版本的支持
                // 只使用 QUIC 版本 1,保证兼容性和稳定性
                config.support_draft_29 = false;

                // 设置初始握手的超时时间为 5 秒
                // 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
                config.handshake_timeout = Duration::from_secs(10);

                // 设置最大空闲超时时间为 10,000 毫秒(10 秒)
                // 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
                config.max_idle_timeout = 10 * 1000;

                // 设置最大并发双向流的数量为 256
                // 限制单个连接可以同时打开的流的数量,防止资源耗尽
                config.max_concurrent_stream_limit = 1024;

                // 设置保活包发送间隔为 5 秒
                // 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
                config.keep_alive_interval = Duration::from_secs(5);

                // 设置单个连接的最大数据量为 50 MB
                // 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
                config.max_connection_data = 100_000_000;

                // 设置单个流的最大数据量为 30 MB
                // 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
                config.max_stream_data = 100_000_000;

                config

            })
            .with_dns()?
            .with_relay_client(noise::Config::new, yamux::Config::default)?
            .with_behaviour(|keypair, relay_behaviour| {
                let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
                Ok(behaviour)
            })?
            .with_swarm_config(|c| {
                c.with_idle_connection_timeout(Duration::from_secs(60))
                    .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
                    .with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
            })
            .build();

    swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

This is the packet sending part. The data packet is just a simulated UDP data packet with a size of 200 bytes. The first 4 bytes are magic bytes, followed by some IPs and finally the data.

    let tlv_bytes = TLVProtocol::to_bytes(&tlv_packet);
    
    let tlv_length = tlv_bytes.len() as u32;

    let mut stream_buf = BufWriter::new(stream);

    stream_buf.write_all(&tlv_length.to_be_bytes()).await?;

    stream_buf.write_all(&tlv_bytes).await?;

I have tried dynamic buffers to manage the buffer queue, but it seems to have nothing to do with this.

I am not sure what the problem is, whether it is my reading method that is wrong, or some other problem. If you know anything about this, please help me. Thank you