ThreadPool Loop w/ std::net TCP Connections Timing Out?

Hi there, I'm still relatively new to Rust and am having trouble with talking to some devices I have on my network. Whenever a device (this is not my design) has a TCP connection, and receives a command, it responds and immediately closes the TCP connection (at least, I believe from what I've seen with Wireshark and my attempts to send multiple commands along a single TCP connection).

Basically, I'm scanning a range of IP addresses on this network (every 5 minutes using JobSchedler) and am needing to go through and send a command to determine the type, then create another connection and do data collection, receive the response, and do some data transformation and immediately do an HTTP request to my local GraphQL API.

Right now, my attempt is using the ThreadPool library, but I'm willing to bet that my approach to the networking in the manner I'm doing so is wrong because it's not gathering all of the devices and I have no idea as to why -- like I said, still very new to Rust (NodeJS background, ha). I'm wanting to refactor to move towards either Tokio or Rayon, depending.

Here is my code:

use reqwest::blocking::Client as ReqwestClient;
use serde_json::{json, Value};
use std::io::{self, Read, Write};
use std::net::{Ipv4Addr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use threadpool::ThreadPool;

fn main() {
    let mut sched_scanner = JobScheduler::new();
    
    // every 5 minutes
    let expr_scanner = "0 0/5 * * * *";

    sched_scanner.add(Job::new(expr_scanner.parse().unwrap(), move || {
        crawler_main();
    }));
}

fn crawler_main() {
    let start_ip: u32 = u32::from(Ipv4Addr::from_str("192.168.1.1").unwrap());
    let end_ip: u32 = u32::from(Ipv4Addr::from_str("192.168.1.255").unwrap());

    let pool = ThreadPool::new(25);

    let client = ReqwestClient::new();

    for ip_address in start_ip..=end_ip {
        let ip = u32_to_ip(ip_address);
        let addr: SocketAddr = format!("{}:1337", ip).parse().unwrap();

        let shared_client = Mutex::new(Arc::new(client.clone()));
        let shared_addr = Mutex::new(Arc::new(addr));

        pool.execute(move || {
            let addr = shared_addr.lock().unwrap();
            
            match TcpStream::connect_timeout(&addr, Duration::from_millis(200)) {
                Ok(&mut stream) => {
                    let type_response = send_command(&addr, r#"{"get": "type"}"#);
                    
                    let merged_json_stats = match type_response.as_str() {
                        "1.3" => {
                            let v13_stats = send_command(&addr, r#"{"get": "weather", "date": "2023-10-15"}"#);
                            
                            normalize_stats(v13_stats)
                        },
                        "2.0" => {
                            let v2_stats = send_command(&addr, r#"{"get": "weather", "day": "today"}"#);
                            
                            normalize_stats(v2_stats)
                        },
                        _ => println!("Error: Type not found.")
                    };
                    
                    let query_obj = json!({
                        "query": r#"
                            mutation ingest($json: String!) {
                                weatherIngest(input: { payload: $json })
                            }
                        "#,
                        "variables": {
                            "json": merged_json_stats.to_string(),
                        }
                    });
                    
                    let res = shared_client
                        .lock()
                        .unwrap()
                        .post("http://localhost:1339")
                        .json(&query_obj)
                        .send()
                        .unwrap();
    
                    let bytes = res.bytes().unwrap();
                    let response = std::str::from_utf8(&bytes).unwrap();
                    // check if response has "OK" in it
                    if !response.contains("\"OK\"") {
                        println!("Error sending data to collector API: {}", response);
                    }
                }
                Err(e) => {
                    println!("Could not connect to {}: {}", ip, e);
                }
            }
        });
    }

    pool.join();

    dbg!("Collection complete.");
}

fn normalize_stats(response: String) -> Value {
    // do some stuff
    json!({
        "temperature": 30,
        "wind_speed": 1.5,
        "wind_dir": 1.1,
    })
}

fn send_command(addr: &std::net::SocketAddr, command: &str) -> String {
    let mut stream = TcpStream::connect_timeout(addr, Duration::from_millis(200)).unwrap();

    let _ = write_data(&mut stream, command.as_bytes());
    let res = read_data(&mut stream).unwrap();

    // remove null bytes_read
    let _ = res.replace('\0', "");

    res
}

fn write_data(stream: &mut impl Write, data: &[u8]) -> io::Result<()> {
    stream.write_all(data).unwrap();

    stream.flush()
}

const MESSAGE_BUFFER_SIZE: usize = 8;

fn read_data(buf: &mut impl Read) -> io::Result<String> {
    let mut recvd: Vec<u8> = vec![];

    let mut rx_bytes = [0u8; MESSAGE_BUFFER_SIZE];

    loop {
        let bytes_read = buf.read(&mut rx_bytes)?;

        recvd.extend_from_slice(&rx_bytes[..bytes_read]);

        if bytes_read < MESSAGE_BUFFER_SIZE {
            break;
        }
    }

    String::from_utf8(recvd).map_err(|_| {
        io::Error::new(
            io::ErrorKind::InvalidData,
            "Received invalid UTF-8 data from server",
        )
    })
}

this is totally unnecessary. for example, if you are only scanning a /24 subnet, you can just do this:

for x in 1..=255 {
    let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, x), 1337);
}

this is unnecessary and wrong. the socket address doesn't need to be shared, it is safe to send to the thread pool. neither Arc nor Mutex is needed.

the web client, I don't know much about it, but even if it is supposed to be shared, the correct way to share a resource between threads Arc<Mutex<...>>, not the other way around.

also, don't create new resource in the loop. namely, Arc::new() and client.clone().

// outside of loop, create `Arc` to shared resource
let client = RequestClient::new();
let client = Arc::new(Mutex::new(client));
// inside loop, just clone the `Arc`, which let you share the resource between threads
for .... {
    let addr = SocketAddrV4:new(....);
    let client = client.clone();
    // the `move` keyword moves the variable into the closure which is then send to the thread pool to execute
    pool.execute(move || {
        //...
    });
}

connect_timeout returns Result<TcpStream>, don't use reference in pattern, bind the value directly:

match ... {
    Ok(mut stream) => {
        //...
    },
}

add tracing/logging to your program can be helpful to figure out what went wrong.

for io bound tasks like network communication, tokio is generally better suited. rayon can be used typically for cpu bound tasks.

1 Like

Thank you very much for your detailed reply! I'll implement these changes and report back as soon as I can. I had a feeling I was doing quite a few unnecessary things, and after reading your very detailed reply, they are making sense as to why they're unnecessary and wrong.