Concurrent lookup of multiple DNS records using tokio

Hi folks,

I'm very new to rust, but hope what I'm trying to achieve isn't too difficult

I want to perform concurrent dns lookups and print the results as json. I'm using trust-dns-resolver to do the lookup

I have managed to query them one after the other, which for a list of ~100 domains took ~6s (on my local machine)

I would like to query them concurrently and then print all results

Here's what I have so far...

Main.rs

use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::config::{ResolverConfig,ResolverOpts};
use tokio::runtime::Runtime;
use crate::lookup::Result;

mod lookup;

fn main() {
    let hosts = vec![String::from("example.org."), String::from("rust-lang.org."), String::from("reddit.com.")];

    let io_loop = Runtime::new().unwrap();

    let resolver = io_loop.block_on(async {
        TokioAsyncResolver::tokio(
            ResolverConfig::default(),
            ResolverOpts::default())
    }).unwrap();

    let mut all_results : Vec<Result> = Vec::new();

    for host in hosts {
        let result : Result = lookup::query(host.clone(), &io_loop, &resolver);
        all_results.push(result);
    }

    println!("{}", serde_json::to_string(&all_results).unwrap());
}

Lookup.rs

use tokio::runtime::Runtime;
use trust_dns_resolver::AsyncResolver;
use trust_dns_resolver::name_server::GenericConnection;
use trust_dns_resolver::name_server::GenericConnectionProvider;
use trust_dns_resolver::proto::xfer::DnsRequestOptions;
use trust_dns_resolver::proto::rr::RecordType;
use serde::{Serialize};
use trust_dns_resolver::name_server::TokioRuntime;

#[derive(Serialize,Clone)]
pub struct Record {
    ttl: i32,
    #[serde(rename = "type")]
    rtype: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    ip: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    ipv6: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    target: Option<String>,
}

#[derive(Serialize)]
pub struct Result {
    host: String,
    records: Vec<Record>
}

pub fn query(
    target: String,
    io_loop: &Runtime,
    resolver: &AsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>
    ) -> Result {

    let lookup_a_future = resolver.lookup_ip(target.clone());
    let lookup_aaaa_future = resolver.ipv6_lookup(target.clone());
    let lookup_cname_future = resolver.lookup(target.clone(), RecordType::CNAME, DnsRequestOptions::default());

    let a_records: Vec<Record> = match io_loop.block_on(lookup_a_future) {
        Ok(response) => {response.into_iter().map(|address| {
            Record {
                ttl: 60, // TODO::FIX ME, get real value
                rtype: String::from("A"),
                ip: Some(address.to_string()),
                ipv6: None,
                target: None,
            }
        }).collect()},
        Err(_) => {Vec::new()},
    };

    let aaaa_records: Vec<Record> = match io_loop.block_on(lookup_aaaa_future) {
        Ok(response) => {response.into_iter().map(|address| {
            Record {
                ttl: 60, // TODO::FIX ME, get real value
                rtype: String::from("AAAA"),
                ip: None,
                ipv6: Some(address.to_string()),
                target: None,
            }
        }).collect()},
        Err(_) => {Vec::new()},
    };

    let cname_records: Vec<Record> = match io_loop.block_on(lookup_cname_future) {
        Ok(response) => {response.into_iter().map(|address| {
            Record {
                ttl: 60, // TODO::FIX ME, get real value
                rtype: String::from("CNAME"),
                ip: None,
                ipv6: None,
                target: Some(address.to_string()),
            }
        }).collect()},
        Err(_) => {Vec::new()},
    };

    let all_records = [
        a_records,
        aaaa_records,
        cname_records
    ].concat();

    Result {
        host: target.clone(),
        records: all_records
    }
}

Many thanks
J.A

Instead of having query take a &Runtime as argument and call block_on, I would make it an async fn and have it use .await. Then, I would call runtime.spawn(lookup::query(...)) for each host in a loop like this:

let mut joins = Vec::new();
for host in hosts {
    joins.push(runtime.spawn(lookup::query(...)));
}
let mut all_results = Vec::new();
for join in joins {
    all_results.push(rt.block_on(join).unwrap());
}

The AsyncResolver will probably need to be wrapped in an Arc like this:

pub async fn query(
    target: String,
    resolver: Arc<AsyncResolver<...>>,
) -> Result
3 Likes

Hi :slight_smile:

I see your avatar all over the various rust channels - thank you for all of the effort you put in helping the community grow!

I think I'm getting there, but clearly missing something to do with async

I'm now running this behind an actix web server, so the setup has changed a little but is largely the same

I'm getting a panic from the runtime.block_on call in handlers.rs

for join in joins {
    all_results.push(rt.block_on(join).unwrap());
}

Included all code below for clarity

main.rs

use actix_web::{web,App, HttpServer};

mod handlers;
mod lookup;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "actix_web=debug");

    let address = "0.0.0.0";
    let port = 8080;
    println!("Starting server on port: {}", port);

    HttpServer::new(move || App::new().route("/resolve", web::post().to(handlers::resolve)))
        .shutdown_timeout(1)
        .bind((address, port))?
        .run()
        .await
}

handlers.rs

use actix_web::{web, Error, HttpResponse};
use tokio::runtime::Handle;
use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::config::{ResolverConfig,ResolverOpts};
use std::sync::Arc;
use crate::lookup::{query, QueryResult};

// Handler for POST /resolve
pub async fn resolve(
    host_list: web::Json<Vec<String>>,
) -> Result<HttpResponse, Error> {

    let results = do_lookup(host_list).await;

    Ok(HttpResponse::Ok()
        .content_type("application/json")
        .body(serde_json::to_string(&results).unwrap()))
}

async fn do_lookup(host_list: web::Json<Vec<String>>) -> Vec<QueryResult> {
    let rt = Handle::current();

    let resolver = TokioAsyncResolver::tokio(
        ResolverConfig::default(),
        ResolverOpts::default()
    ).unwrap();

    let mut joins = Vec::new();
    for host in host_list.iter() {
        joins.push(rt.spawn(query(host.to_string(), Arc::new(resolver.clone()))));
    }
    let mut all_results = Vec::new();

    for join in joins {
        all_results.push(rt.block_on(join).unwrap());
    }

    all_results           
}

lookup.rs

use std::sync::Arc;
use serde::{Serialize};
use trust_dns_resolver::AsyncResolver;
use trust_dns_resolver::name_server::{GenericConnection,GenericConnectionProvider,TokioRuntime};
use trust_dns_resolver::proto::rr::RecordType;
use trust_dns_resolver::proto::xfer::DnsRequestOptions;

#[derive(Serialize,Clone)]
pub struct Record {
    ttl: i32,
    #[serde(rename = "type")]
    rtype: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    ip: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    ipv6: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    target: Option<String>,
}

#[derive(Serialize)]
pub struct QueryResult {
    host: String,
    records: Vec<Record>
}

pub async fn query(
    target: String,
    resolver: Arc<AsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>>,
    ) -> QueryResult {

    let options = DnsRequestOptions::default();

    let lookup_a_future = resolver.lookup(target.clone(), RecordType::A, options);

    let a_records: Vec<Record> = match lookup_a_future.await {
        Ok(response) => {response.into_iter().map(|address| {
            Record {
                ttl: 60, // TODO::FIX ME, get real value
                rtype: String::from("A"),
                ip: Some(address.to_string()),
                ipv6: None,
                target: None,
            }
        }).collect()},
        Err(_) => {Vec::new()},
    };

    let all_records = [
        a_records,
    ].concat();

    QueryResult {
        host: target,
        records: all_records
    }
}

Thanks again!

AHA!

I changed that block to

for join in joins {
    let r = join.await;
    all_results.push(r.unwrap());
}

and it's working :slight_smile:

Hi,

Everything is working great, but it's not as fast as I expected...

The same problem implemented in js (on node) takes ~7s compared to ~35s in rust when querying a list of ~17,000 domains

Are you able to recommend any optimisations from looking at the code above?

My ideas (although they might be completely misguided :joy: )

  • Creating a resolver in main and pass it down as a Mutex in app_data, as currently the code keeps cloning the immutable resolver, which I imagine isn't great... I can't quite get this to work though
  • I wonder if I'm understanding tokio correctly - is there any way to increase the number of worker threads to fire off more concurrect requests (I don't actually know at the moment how many there are - is it just the number of cpu cores?)
  • In lookup::query, I run 3 separate lookups (for A, AAAA, and CNAME records). Perhaps I might be better spawning each of these individually for each host), so that my do_lookup function calls
let mut joins = Vec::new();
    for host in host_list.iter() {
        joins.push(rt.spawn(query_a_records(host.to_string(), Arc::new(resolver.clone()))));
        joins.push(rt.spawn(query_aaaa_records(host.to_string(), Arc::new(resolver.clone()))));
        joins.push(rt.spawn(query_cname_records(host.to_string(), Arc::new(resolver.clone()))));
    }

(I'm coming from mostly php/nodejs, so much of this is very new to me. Thanks for your help so far - there's certainly a lot to learn!)