Can't make Quinn/Quic multiple connections to a server

I have tried this multiple ways (as previous questions indicate), but now I've just tried to fairly simply modify one of the provided examples. I still can only make one live connection to a quinn server. Code provided at end. I even tried to include a connection::close() in the client thinking maybe I could just get successive connections. But I end up with only one live connection that doesn't close. I am really confused at what I'm doing wrong.

use std::{error::Error, net::SocketAddr};
use quinn::{Endpoint, VarInt};
use rustls::pki_types::{CertificateDer};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    rustls::crypto::ring::default_provider().install_default().expect("Failed to install rustls crypto provider");
    let addr = "127.0.0.1:5000".parse().unwrap();
    let server1_cert = run_server(addr)?;

    let client1 = make_client_endpoint(
        "127.0.0.1:5005".parse().unwrap(),
        &[&server1_cert],
    )?;
    let client2 = make_client_endpoint(
        "127.0.0.1:5006".parse().unwrap(),
        &[&server1_cert],
    )?;
    let client3 = make_client_endpoint(
        "127.0.0.1:5007".parse().unwrap(),
        &[&server1_cert],
    )?;

    let c1=    run_client(&client1, addr);
    let c2=    run_client(&client2, addr);
    let c3=    run_client(&client3, addr);
    c1.await;
    c2.await;
    c3.await;

    // Make sure the server has a chance to clean up
    client1.wait_idle().await;
    client2.wait_idle().await;
    client3.wait_idle().await;

    Ok(())
}

/// Runs a QUIC server bound to given address and returns server certificate.
fn run_server(
    addr: SocketAddr,
) -> Result<CertificateDer<'static>, Box<dyn Error + Send + Sync + 'static>> {
    let (endpoint, server_cert) = make_server_endpoint(addr)?;
    // accept a single connection
    tokio::spawn(async move {
        let connection = endpoint.accept().await.unwrap().await.unwrap();
        println!(
            "[server] incoming connection: addr={}",
            connection.remote_address()
        );
    });

    Ok(server_cert)
}

/// Attempt QUIC connection with the given server address.
async fn run_client(endpoint: &Endpoint, server_addr: SocketAddr) {
    let connect = endpoint.connect(server_addr, "localhost").unwrap();
    let cx = connect.await.unwrap();
    println!("[client] connected: addr={}", cx.remote_address());
    cx.close(VarInt::from_u32(2),b"closed");
}

//#![cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
// Commonly used code in most examples.

use quinn::{ClientConfig, ServerConfig};
use rustls::pki_types::{PrivatePkcs8KeyDer};

use std::{ sync::Arc};
use std::net::Ipv4Addr;

/// Constructs a QUIC endpoint configured for use a client only.
///
/// ## Args
///
/// - server_certs: list of trusted certificates.
#[allow(unused)]
pub fn make_client_endpoint(
    bind_addr: SocketAddr,
    server_certs: &[&[u8]],
) -> Result<Endpoint, Box<dyn Error + Send + Sync + 'static>> {
    let client_cfg = configure_client(server_certs)?;
    let mut endpoint = Endpoint::client(bind_addr)?;
    endpoint.set_default_client_config(client_cfg);
    Ok(endpoint)
}

/// Constructs a QUIC endpoint configured to listen for incoming connections on a certain address
/// and port.
///
/// ## Returns
///
/// - a stream of incoming QUIC connections
/// - server certificate serialized into DER format
#[allow(unused)]
pub fn make_server_endpoint(
    bind_addr: SocketAddr,
) -> Result<(Endpoint, CertificateDer<'static>), Box<dyn Error + Send + Sync + 'static>> {
    let (server_config, server_cert) = configure_server()?;
    let endpoint = Endpoint::server(server_config, bind_addr)?;
    Ok((endpoint, server_cert))
}

/// Builds default quinn client config and trusts given certificates.
///
/// ## Args
///
/// - server_certs: a list of trusted certificates in DER format.
fn configure_client(
    server_certs: &[&[u8]],
) -> Result<ClientConfig, Box<dyn Error + Send + Sync + 'static>> {
    let mut certs = rustls::RootCertStore::empty();
    for cert in server_certs {
        certs.add(CertificateDer::from(*cert))?;
    }

    Ok(ClientConfig::with_root_certificates(Arc::new(certs))?)
}

/// Returns default server configuration along with its certificate.
fn configure_server(
) -> Result<(ServerConfig, CertificateDer<'static>), Box<dyn Error + Send + Sync + 'static>> {
    let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
    let cert_der = CertificateDer::from(cert.cert);
    let priv_key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());

    let mut server_config =
        ServerConfig::with_single_cert(vec![cert_der.clone()], priv_key.into())?;
    let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
    transport_config.max_concurrent_uni_streams(10_u8.into());

    Ok((server_config, cert_der))
}

#[allow(unused)]
pub const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-29"];

Look at run_server() and read what it does.

1 Like

I'm still thinking it must be something I did. However, I do have a single server accepting multiple connections using s2n_quic using mostly sample code. Code below:

use std::net::SocketAddr;
use std::path::Path;
use std;
use s2n_quic::{Server, client::Connect, provider::tls, Client};


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {

    let tls = tls::default::Server::builder()
        .with_trusted_certificate(Path::new("/home/newuser/Dev/DisMember/V0/certs/ca-cert.pem"))?
        .with_certificate(Path::new("/home/newuser/Dev/DisMember/V0/certs/server-cert.pem"), Path::new("/home/newuser/Dev/DisMember/V0/certs/server-key.pem"))?
        .with_client_authentication()?
        .build()?;
    let mut server = Server::builder()
        .with_tls(tls)?
        .with_io("127.0.0.1:4433")?
        .start()?;



    let serv = tokio::spawn(async move{while let Some(mut connection) = server.accept().await {
        // spawn a new task for the connection
        tokio::spawn(async move {
            eprintln!("Connection accepted from {:?}", connection.remote_addr());

            while let Ok(Some(mut stream)) = connection.accept_bidirectional_stream().await {
                // spawn a new task for the stream
                tokio::spawn(async move {
                    eprintln!("Stream opened from {:?}", stream.connection().remote_addr());

                    // echo any data back to the stream
                    while let Ok(Some(data)) = stream.receive().await {
                        stream.send(data).await.expect("stream should be open");
                    }
                });
            }
        });
    }});
    let cli = tokio::spawn(async move {
        let ctls = tls::default::Client::builder()
            .with_certificate(Path::new("/home/newuser/Dev/DisMember/V0/certs/ca-cert.pem")).unwrap()
            .with_client_identity(Path::new("/home/newuser/Dev/DisMember/V0/certs/client-cert.pem"), Path::new("/home/newuser/Dev/DisMember/V0/certs/client-key.pem")).unwrap()
            .build().unwrap();
        let client = Client::builder()
            .with_tls(ctls).unwrap()
            .with_io("0.0.0.0:0").unwrap()
            .start().unwrap();
        let addr: SocketAddr = "127.0.0.1:4433".parse().unwrap();
        let connect = Connect::new(addr).with_server_name("localhost");
        let mut connection = client.connect(connect).await.unwrap();
        connection.keep_alive(true).unwrap();
        let stream = connection.open_bidirectional_stream().await.unwrap();
        let (mut receive_stream, mut send_stream) = stream.split();

        // spawn a task that copies responses from the server to stdout
        tokio::spawn(async move {
            let mut stdout = tokio::io::stdout();
            let _ = tokio::io::copy(&mut receive_stream, &mut stdout).await;
        });

        // copy data from stdin and send it to the server
        send_stream.send("testing from1\n".as_bytes().into()).await.unwrap();
    });
    let cli2= tokio::spawn(async move {
        let ctls = tls::default::Client::builder()
            .with_certificate(Path::new("/home/newuser/Dev/DisMember/V0/certs/ca-cert.pem")).unwrap()
            .with_client_identity(Path::new("/home/newuser/Dev/DisMember/V0/certs/client-cert.pem"), Path::new("/home/newuser/Dev/DisMember/V0/certs/client-key.pem")).unwrap()
            .build().unwrap();
        let client = Client::builder()
            .with_tls(ctls).unwrap()
            .with_io("0.0.0.0:0").unwrap()
            .start().unwrap();
        let addr: SocketAddr = "127.0.0.1:4433".parse().unwrap();
        let connect = Connect::new(addr).with_server_name("localhost");
        let mut connection = client.connect(connect).await.unwrap();
        connection.keep_alive(true).unwrap();
        let stream = connection.open_bidirectional_stream().await.unwrap();
        let (mut receive_stream, mut send_stream) = stream.split();

        // spawn a task that copies responses from the server to stdout
        tokio::spawn(async move {
            let mut stdout = tokio::io::stdout();
            let _ = tokio::io::copy(&mut receive_stream, &mut stdout).await;
        });

        // copy data from stdin and send it to the server
        send_stream.send("testing from2\n".as_bytes().into()).await.unwrap();
    });
    let cli3= tokio::spawn(async move {
        let ctls = tls::default::Client::builder()
            .with_certificate(Path::new("/home/newuser/Dev/DisMember/V0/certs/ca-cert.pem")).unwrap()
            .with_client_identity(Path::new("/home/newuser/Dev/DisMember/V0/certs/client-cert.pem"), Path::new("/home/newuser/Dev/DisMember/V0/certs/client-key.pem")).unwrap()
            .build().unwrap();
        let client = Client::builder()
            .with_tls(ctls).unwrap()
            .with_io("0.0.0.0:0").unwrap()
            .start().unwrap();
        let addr: SocketAddr = "127.0.0.1:4433".parse().unwrap();
        let connect = Connect::new(addr).with_server_name("localhost");
        let mut connection = client.connect(connect).await.unwrap();
        connection.keep_alive(true).unwrap();
        let stream = connection.open_bidirectional_stream().await.unwrap();
        let (mut receive_stream, mut send_stream) = stream.split();

        // spawn a task that copies responses from the server to stdout
        tokio::spawn(async move {
            let mut stdout = tokio::io::stdout();
            let _ = tokio::io::copy(&mut receive_stream, &mut stdout).await;
        });

        // copy data from stdin and send it to the server
        send_stream.send("testing from3\n".as_bytes().into()).await.unwrap();
    });

    serv.await.expect("serv failed");
    cli.await.expect("cli failed");
    cli2.await.expect("cli2 failed");
    cli3.await.expect("cli2 failed");
    Ok(())
}

It spawns a tokio thread to accept a connection. What's keeping it from taking multiple connections? I know what the comment says, but what keeps the code from taking multiple connections?

Guess you need another hint; when familiar with Rust/C++ this looks trivial to read.
-accept 1 connection
-print
-drop connection
-drop endpoint

1 Like

You don't have any loop in the original run_server - it just accepts one connection and then reaches the end of the task.

1 Like

Gotcha. Why doesn't it close the connection when it's done?

What result are you seeing that makes you think it doesn't?

When run, the application just keeps running. If the connection is closed the application should just end as the spawned threads having nothing to do. But I'm guessing I'm wrong

So after digging through what worked and what didn't in quin and s2_quic the issue seems to be where I had the loop. In previous questions with code posted, I had the same issue of not accepting multiple connections even with a loop. There was a loop, but I had it in the wrong place.

Thank you all!!!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.