About libp2p_stream and quic stream
- First of all, I just learned rust and libp2p not long ago. I plan to use libp2p to build a collaboration tool. The problem I encounter now is that the delay between my two computers in the LAN is 1ms. When I use libp2p to connect ping or transmit udp data packets, the delay is 20-30ms, which is unacceptable. I want to reduce the delay to about 5ms. I understand that using libp2p itself has performance loss. But 20-30ms is too high.
- The problem is that I have several ideas now:
I use opentelemetry to track performance analysis,
pub async fn read_packet(stream: &mut Stream) -> Result<(TLVProtocol, Vec<u8>)> {
// Total time starts
let total_start = Instant::now();
info!(target: "app::net::p2p::stream::packet", "Stream Start reading data");
// 1. Create BufReader
let buf_reader_start = Instant::now();
let mut buffered = BufReader::with_capacity(16384, stream); // 16KB buffer
let buf_reader_time = buf_reader_start.elapsed();
info!(target: "app::sys::performance", "1. BufReader creation time: {:?}", buf_reader_time);
// 2. Buffer allocation
let alloc_start = Instant::now();
let mut buffer = BytesMut::with_capacity(4096);
let alloc_time = alloc_start.elapsed();
info!(target: "app::sys::performance", "2. Buffer allocation time: {:?}", alloc_time);
// 3. Header resize
let resize_header_start = Instant::now();
buffer.resize(4, 0);
let resize_header_time = resize_header_start.elapsed();
info!(target: "app::sys::performance", "3. Header adjustment time: {:?}", resize_header_time);
// 4. Header reading
let read_header_start = Instant::now();
match buffered.read_exact(&mut buffer[..4]).await {
match buffered.read_exact(&mut buffer[..4]).await The delay of this sentence will fluctuate between 10ms-100ms.
My stream is built based on libp2p_Stream lib. Here is to get the stream controller.
stream_result = incoming_streams.next() => {
match stream_result {
Some((peer, stream)) => {
Mode::Dial(_) => {
match control.open_stream(open_peer_id, tlv).await {
Then I operate the stream to transmit my application data between two nodes.
My question now is, in addition to using libp2p_Stream lib to build custom streams to transmit data, is there a more efficient way to transmit data, such as based on quic_stream.
Because there are too few tutorials related to libp2p, and most of the docs.rs do not have examples, I don’t know what to do now. Is to find another stream to replace my original libp2p_Stream lib, I'd better focus on solving the match buffered.read_exact(&mut buffer[..4]).await problem, which can help me solve the latency problem.
Another problem is that I use iperf3 to test the performance of my two nodes. When they are directly connected, they can reach 30-40Mbps, but when using libp2p, they are only 2Mbps, and packets will be lost as long as they exceed 2M.
Based on the above reasons, I have been wondering if my stream construction method is incorrect.
Below is my Behaviour build and swarm build
pub fn new(
keypair: &libp2p::identity::Keypair,
relay_behaviour: relay::client::Behaviour,
) -> Result<Self> {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the logs space
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.map_err(|msg| std::io::Error::new(std::io::ErrorKind::Other, msg))?; // Temporary hack because "build" does not return a proper "std::error::Error".
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config,
)
.map_err(|e| anyhow::anyhow!("创建 gossipsub 网络行为错误: {}", e))?;
let relay_client = relay_behaviour;
let ping = ping::Behaviour::new(ping::Config::new());
let identify = identify::Behaviour::new(identify::Config::new(
GlobalState::global_config().get_global_identify_name()?,
keypair.public(),
));
let dcutr = dcutr::Behaviour::new(keypair.public().to_peer_id());
let mdns =
mdns::tokio::Behaviour::new(mdns::Config::default(), keypair.public().to_peer_id())?;
let stream = libp2p_stream::Behaviour::new();
Ok(ClientBehaviour {
relay_client,
ping,
identify,
dcutr,
gossipsub,
mdns,
stream,
})
}
let mut swarm =
libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
.with_tokio()
.with_tcp(
tcp::Config::default().nodelay(true),
noise::Config::new,
yamux::Config::default,
)?
// .with_quic_config(|mut config| {
// config.keep_alive_interval = Duration::from_secs(5);
// config.max_idle_timeout = Duration::from_millis(10_000);
// config.max_concurrent_stream_limit = 256;
// // 设置其他字段...
// config
// })
.with_quic_config(|mut config| {
// 禁用对 QUIC 协议草案 29 版本的支持
// 只使用 QUIC 版本 1,保证兼容性和稳定性
config.support_draft_29 = false;
// 设置初始握手的超时时间为 5 秒
// 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
config.handshake_timeout = Duration::from_secs(10);
// 设置最大空闲超时时间为 10,000 毫秒(10 秒)
// 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
config.max_idle_timeout = 10 * 1000;
// 设置最大并发双向流的数量为 256
// 限制单个连接可以同时打开的流的数量,防止资源耗尽
config.max_concurrent_stream_limit = 1024;
// 设置保活包发送间隔为 5 秒
// 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
config.keep_alive_interval = Duration::from_secs(5);
// 设置单个连接的最大数据量为 50 MB
// 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
config.max_connection_data = 100_000_000;
// 设置单个流的最大数据量为 30 MB
// 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
config.max_stream_data = 100_000_000;
config
})
.with_dns()?
.with_relay_client(noise::Config::new, yamux::Config::default)?
.with_behaviour(|keypair, relay_behaviour| {
let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
Ok(behaviour)
})?
.with_swarm_config(|c| {
c.with_idle_connection_timeout(Duration::from_secs(60))
.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
.with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
})
.build();
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
let mut swarm =
libp2p::SwarmBuilder::with_existing_identity(helper::generate_ed25519(secret_key_seed))
.with_tokio()
.with_tcp(
tcp::Config::default().nodelay(true),
noise::Config::new,
yamux::Config::default,
)?
// .with_quic_config(|mut config| {
// config.keep_alive_interval = Duration::from_secs(5);
// config.max_idle_timeout = Duration::from_millis(10_000);
// config.max_concurrent_stream_limit = 256;
// // 设置其他字段...
// config
// })
.with_quic_config(|mut config| {
// 禁用对 QUIC 协议草案 29 版本的支持
// 只使用 QUIC 版本 1,保证兼容性和稳定性
config.support_draft_29 = false;
// 设置初始握手的超时时间为 5 秒
// 如果在 5 秒内无法完成握手,连接将被终止,防止握手过程卡住
config.handshake_timeout = Duration::from_secs(10);
// 设置最大空闲超时时间为 10,000 毫秒(10 秒)
// 如果连接在 10 秒内没有任何活动,它将被关闭,以释放资源
config.max_idle_timeout = 10 * 1000;
// 设置最大并发双向流的数量为 256
// 限制单个连接可以同时打开的流的数量,防止资源耗尽
config.max_concurrent_stream_limit = 1024;
// 设置保活包发送间隔为 5 秒
// 每 5 秒发送一个保活包,以保持连接活跃,防止因空闲而被关闭
config.keep_alive_interval = Duration::from_secs(5);
// 设置单个连接的最大数据量为 50 MB
// 限制单个连接可以传输的总数据量,防止单个连接占用过多资源
config.max_connection_data = 100_000_000;
// 设置单个流的最大数据量为 30 MB
// 限制单个流可以传输的数据量,确保资源在多个流之间公平分配
config.max_stream_data = 100_000_000;
config
})
.with_dns()?
.with_relay_client(noise::Config::new, yamux::Config::default)?
.with_behaviour(|keypair, relay_behaviour| {
let behaviour = behaviour_build::ClientBehaviour::new(keypair, relay_behaviour)?;
Ok(behaviour)
})?
.with_swarm_config(|c| {
c.with_idle_connection_timeout(Duration::from_secs(60))
.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("eeee"))
.with_per_connection_event_buffer_size(usize::from(NonZeroUsize::new(64).expect("eeee")))
})
.build();
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
This is the packet sending part. The data packet is just a simulated UDP data packet with a size of 200 bytes. The first 4 bytes are magic bytes, followed by some IPs and finally the data.
let tlv_bytes = TLVProtocol::to_bytes(&tlv_packet);
let tlv_length = tlv_bytes.len() as u32;
let mut stream_buf = BufWriter::new(stream);
stream_buf.write_all(&tlv_length.to_be_bytes()).await?;
stream_buf.write_all(&tlv_bytes).await?;
I have tried dynamic buffers to manage the buffer queue, but it seems to have nothing to do with this.
I am not sure what the problem is, whether it is my reading method that is wrong, or some other problem. If you know anything about this, please help me. Thank you