About optimization Quic stream (Need help)

About libp2p_stream and quic stream

  1. First of all, I just learned rust and libp2p not long ago. I plan to use libp2p to build a collaboration tool. The problem I encounter now is that the delay between my two computers in the LAN is 1ms. When I use libp2p to connect ping or transmit udp data packets, the delay is 20-30ms, which is unacceptable. I want to reduce the delay to about 5ms. I understand that using libp2p itself has performance loss. But 20-30ms is too high.
  2. The problem is that I have several ideas now:
    I use opentelemetry to track performance analysis,
pub async fn read_packet(stream: &mut Stream) -> Result<(TLVProtocol, Vec<u8>)> {
// Total time starts
let total_start = Instant::now();
info!(target: "app::net::p2p::stream::packet", "Stream Start reading data");

// 1. Create BufReader
let buf_reader_start = Instant::now();
let mut buffered = BufReader::with_capacity(16384, stream); // 16KB buffer
let buf_reader_time = buf_reader_start.elapsed();
info!(target: "app::sys::performance", "1. BufReader creation time: {:?}", buf_reader_time);

// 2. Buffer allocation
let alloc_start = Instant::now();
let mut buffer = BytesMut::with_capacity(4096);
let alloc_time = alloc_start.elapsed();
info!(target: "app::sys::performance", "2. Buffer allocation time: {:?}", alloc_time);

// 3. Header resize
let resize_header_start = Instant::now();
buffer.resize(4, 0);
let resize_header_time = resize_header_start.elapsed();
info!(target: "app::sys::performance", "3. Header adjustment time: {:?}", resize_header_time);

// 4. Header reading
let read_header_start = Instant::now();
match buffered.read_exact(&mut buffer[..4]).await {

match buffered.read_exact(&mut buffer[..4]).await The delay of this sentence will fluctuate between 10ms-100ms.

My stream is built based on libp2p_Stream lib. Here is to get the stream controller.

stream_result = incoming_streams.next() => {
match stream_result {
Some((peer, stream)) => {

Mode::Dial(_) => {
match control.open_stream(open_peer_id, tlv).await {

Then I operate the stream to transmit my application data between two nodes.

My question now is, in addition to using libp2p_Stream lib to build custom streams to transmit data, is there a more efficient way to transmit data, such as based on quic_stream.

Because there are too few tutorials related to libp2p, and most of the docs.rs do not have examples, I don’t know what to do now. Is to find another stream to replace my original libp2p_Stream lib, I'd better focus on solving the match buffered.read_exact(&mut buffer[..4]).await problem, which can help me solve the latency problem.

Another problem is that I use iperf3 to test the performance of my two nodes. When they are directly connected, they can reach 30-40Mbps, but when using libp2p, they are only 2Mbps, and packets will be lost as long as they exceed 2M.

Based on the above reasons, I have been wondering if my stream construction method is incorrect.

Below is my Behaviour build and swarm build


pub fn new(
        keypair: &libp2p::identity::Keypair,
        relay_behaviour: relay::client::Behaviour,
    ) -> Result<Self> {
        
        let message_id_fn = |message: &gossipsub::Message| {
            let mut s = DefaultHasher::new();
            message.data.hash(&mut s);
            gossipsub::MessageId::from(s.finish().to_string())
        };
        
        let gossipsub_config = gossipsub::ConfigBuilder::default()
            .heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the logs space
            .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
            .message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
            .build()
            .map_err(|msg| std::io::Error::new(std::io::ErrorKind::Other, msg))?; // Temporary hack because "build" does not return a proper "std::error::Error".
        
        let gossipsub = gossipsub::Behaviour::new(
            gossipsub::MessageAuthenticity::Signed(keypair.clone()),
            gossipsub_config,
        )
        .map_err(|e| anyhow::anyhow!("创建 gossipsub 网络行为错误: {}", e))?;

        let relay_client = relay_behaviour;
        let ping = ping::Behaviour::new(ping::Config::new());
        let identify = identify::Behaviour::new(identify::Config::new(
            GlobalState::global_config().get_global_identify_name()?,
            keypair.public(),
        ));
        let dcutr = dcutr::Behaviour::new(keypair.public().to_peer_id());

        let mdns =
            mdns::tokio::Behaviour::new(mdns::Config::default(), keypair.public().to_peer_id())?;

        let stream = libp2p_stream::Behaviour::new();
        
        Ok(ClientBehaviour {
            relay_client,
            ping,
            identify,
            dcutr,
            gossipsub,
            mdns,
            stream,
        })
    }




let mut swarm =
        libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
            .with_tokio()
            .with_tcp(
                tcp::Config::default().nodelay(true),
                noise::Config::new,
                yamux::Config::default,
            )?
            // .with_quic_config(|mut config| {
            //     config.keep_alive_interval = Duration::from_secs(5);
            //     config.max_idle_timeout = Duration::from_millis(10_000);
            //     config.max_concurrent_stream_limit = 256;
            //     // 设置其他字段...
            //     config
            // })
            .with_quic_config(|mut config| {
                // 禁用对 QUIC 协议草案 29 版本的支持
                // 只使用 QUIC 版本 1,保证兼容性和稳定性
                config.support_draft_29 = false;

                // 设置初始握手的超时时间为 5 秒
                // 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
                config.handshake_timeout = Duration::from_secs(10);

                // 设置最大空闲超时时间为 10,000 毫秒(10 秒)
                // 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
                config.max_idle_timeout = 10 * 1000;

                // 设置最大并发双向流的数量为 256
                // 限制单个连接可以同时打开的流的数量,防止资源耗尽
                config.max_concurrent_stream_limit = 1024;

                // 设置保活包发送间隔为 5 秒
                // 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
                config.keep_alive_interval = Duration::from_secs(5);

                // 设置单个连接的最大数据量为 50 MB
                // 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
                config.max_connection_data = 100_000_000;

                // 设置单个流的最大数据量为 30 MB
                // 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
                config.max_stream_data = 100_000_000;

                config

            })
            .with_dns()?
            .with_relay_client(noise::Config::new, yamux::Config::default)?
            .with_behaviour(|keypair, relay_behaviour| {
                let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
                Ok(behaviour)
            })?
            .with_swarm_config(|c| {
                c.with_idle_connection_timeout(Duration::from_secs(60))
                    .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
                    .with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
            })
            .build();

    swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;


let mut swarm =
        libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
            .with_tokio()
            .with_tcp(
                tcp::Config::default().nodelay(true),
                noise::Config::new,
                yamux::Config::default,
            )?
            // .with_quic_config(|mut config| {
            //     config.keep_alive_interval = Duration::from_secs(5);
            //     config.max_idle_timeout = Duration::from_millis(10_000);
            //     config.max_concurrent_stream_limit = 256;
            //     // 设置其他字段...
            //     config
            // })
            .with_quic_config(|mut config| {
                // 禁用对 QUIC 协议草案 29 版本的支持
                // 只使用 QUIC 版本 1,保证兼容性和稳定性
                config.support_draft_29 = false;

                // 设置初始握手的超时时间为 5 秒
                // 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
                config.handshake_timeout = Duration::from_secs(10);

                // 设置最大空闲超时时间为 10,000 毫秒(10 秒)
                // 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
                config.max_idle_timeout = 10 * 1000;

                // 设置最大并发双向流的数量为 256
                // 限制单个连接可以同时打开的流的数量,防止资源耗尽
                config.max_concurrent_stream_limit = 1024;

                // 设置保活包发送间隔为 5 秒
                // 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
                config.keep_alive_interval = Duration::from_secs(5);

                // 设置单个连接的最大数据量为 50 MB
                // 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
                config.max_connection_data = 100_000_000;

                // 设置单个流的最大数据量为 30 MB
                // 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
                config.max_stream_data = 100_000_000;

                config

            })
            .with_dns()?
            .with_relay_client(noise::Config::new, yamux::Config::default)?
            .with_behaviour(|keypair, relay_behaviour| {
                let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
                Ok(behaviour)
            })?
            .with_swarm_config(|c| {
                c.with_idle_connection_timeout(Duration::from_secs(60))
                    .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
                    .with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
            })
            .build();

    swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

This is the packet sending part. The data packet is just a simulated UDP data packet with a size of 200 bytes. The first 4 bytes are magic bytes, followed by some IPs and finally the data.

    let tlv_bytes = TLVProtocol::to_bytes(&tlv_packet);
    
    let tlv_length = tlv_bytes.len() as u32;

    let mut stream_buf = BufWriter::new(stream);

    stream_buf.write_all(&tlv_length.to_be_bytes()).await?;

    stream_buf.write_all(&tlv_bytes).await?;

I have tried dynamic buffers to manage the buffer queue, but it seems to have nothing to do with this.

I am not sure what the problem is, whether it is my reading method that is wrong, or some other problem. If you know anything about this, please help me. Thank you

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.