Difference between Stream .for_each and .for_each_concurrent

Hi,
I'm working on a dynamic DNS server and after refactoring my code to process the UDP frames concurrently, I'm a bit confused, if it even will make a difference in practice. The cause of my confusion is the lock of the Mutex I have to use in order to use the streams sink to send back data.

The old code:

    let addr = SocketAddr::from((settings.dns_address, settings.dns_port));
    let udp_socket = UdpSocket::bind(&addr).await.unwrap();
    let mut dns_stream = UdpFramed::new(udp_socket, DnsMessageCodec::new());

    let udp_server = tokio::spawn(async move {
        loop {
            debug!("Waiting for DNS queries...");
            let (query, addr) = dns_stream.next().map(|e| e.unwrap()).await.unwrap();

            debug!("DNS query received: {:?}", query);
            let response = match query {
                QueryMessage::StandardQuery(query) => {
                    let records = storage.lock().await;
                    handle_standard_query(&records, query)
                }
                _ => panic!("Not Implemented"),
            };

            debug!("DNS response: {:?}", response);
            dns_stream.send((response, addr)).await.unwrap();
        }
    });

The code after refactoring:

    let addr = SocketAddr::from((ip, port));
    let udp_socket = UdpSocket::bind(&addr).await.unwrap();
    let (dns_sink, dns_stream) = UdpFramed::new(udp_socket, DnsMessageCodec::new()).split();

    info!(
        "DNS server now listening on: {ip}:{port}",
        ip = ip,
        port = port
    );

    let sink = &Arc::new(Mutex::new(dns_sink));
    dns_stream
        .for_each_concurrent(100, |result| async move {
            let (query, addr) = result.unwrap();

            //handle_dns_query(query, addr, storage_clone.clone(), sink) .await;
            debug!("DNS query received: {:?}", query);
            let response = match query {
                QueryMessage::StandardQuery(query) => {
                    //let records = storage.lock().await;
                    let records = storage.read().await;
                    handle_standard_query(&records, query)
                }
                // FIXME response with not implemented error
                _ => panic!("Not Implemented"), // TODO: not implemented response
            };

            debug!("DNS response: {:?}", response);
            let mut sink = sink.lock().await;
            sink.send((response, addr)).await.unwrap();
        })
        .await;
}

The full code can be found on GitHub - kunerd/koppeln: A minimalistic standalone dynamic DNS server..

To quote the docs:

This is similar to StreamExt::for_each, but the futures produced by the closure are run concurrently (but not in parallel– this combinator does not introduce any threads).
The closure provided will be called for each item this stream produces, yielding a future. That future will then be executed to completion concurrently with the other futures produced by the closure.

Thank you for your help. I have read the docs for both functions after I read the async book were I first heard about for_each_concurrent(). Maybe my heading is a bit misleading, because my question is not so much about the difference between for_each and for_each_concurrent(), but more about the difference in my specific example above. From my understanding my newer code - the one using for_each_concurrend - would process any incoming DNS message by preparing the response and initiating the sending of the answer message concurrently. But, isn't the lock of the Mutex some kind of bottleneck here?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.