Hi there, I'm still relatively new to Rust and am having trouble with talking to some devices I have on my network. Whenever a device (this is not my design) has a TCP connection, and receives a command, it responds and immediately closes the TCP connection (at least, I believe from what I've seen with Wireshark and my attempts to send multiple commands along a single TCP connection).
Basically, I'm scanning a range of IP addresses on this network (every 5 minutes using JobSchedler) and am needing to go through and send a command to determine the type, then create another connection and do data collection, receive the response, and do some data transformation and immediately do an HTTP request to my local GraphQL API.
Right now, my attempt is using the ThreadPool library, but I'm willing to bet that my approach to the networking in the manner I'm doing so is wrong because it's not gathering all of the devices and I have no idea as to why -- like I said, still very new to Rust (NodeJS background, ha). I'm wanting to refactor to move towards either Tokio or Rayon, depending.
Here is my code:
use reqwest::blocking::Client as ReqwestClient;
use serde_json::{json, Value};
use std::io::{self, Read, Write};
use std::net::{Ipv4Addr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use threadpool::ThreadPool;
fn main() {
let mut sched_scanner = JobScheduler::new();
// every 5 minutes
let expr_scanner = "0 0/5 * * * *";
sched_scanner.add(Job::new(expr_scanner.parse().unwrap(), move || {
crawler_main();
}));
}
fn crawler_main() {
let start_ip: u32 = u32::from(Ipv4Addr::from_str("192.168.1.1").unwrap());
let end_ip: u32 = u32::from(Ipv4Addr::from_str("192.168.1.255").unwrap());
let pool = ThreadPool::new(25);
let client = ReqwestClient::new();
for ip_address in start_ip..=end_ip {
let ip = u32_to_ip(ip_address);
let addr: SocketAddr = format!("{}:1337", ip).parse().unwrap();
let shared_client = Mutex::new(Arc::new(client.clone()));
let shared_addr = Mutex::new(Arc::new(addr));
pool.execute(move || {
let addr = shared_addr.lock().unwrap();
match TcpStream::connect_timeout(&addr, Duration::from_millis(200)) {
Ok(&mut stream) => {
let type_response = send_command(&addr, r#"{"get": "type"}"#);
let merged_json_stats = match type_response.as_str() {
"1.3" => {
let v13_stats = send_command(&addr, r#"{"get": "weather", "date": "2023-10-15"}"#);
normalize_stats(v13_stats)
},
"2.0" => {
let v2_stats = send_command(&addr, r#"{"get": "weather", "day": "today"}"#);
normalize_stats(v2_stats)
},
_ => println!("Error: Type not found.")
};
let query_obj = json!({
"query": r#"
mutation ingest($json: String!) {
weatherIngest(input: { payload: $json })
}
"#,
"variables": {
"json": merged_json_stats.to_string(),
}
});
let res = shared_client
.lock()
.unwrap()
.post("http://localhost:1339")
.json(&query_obj)
.send()
.unwrap();
let bytes = res.bytes().unwrap();
let response = std::str::from_utf8(&bytes).unwrap();
// check if response has "OK" in it
if !response.contains("\"OK\"") {
println!("Error sending data to collector API: {}", response);
}
}
Err(e) => {
println!("Could not connect to {}: {}", ip, e);
}
}
});
}
pool.join();
dbg!("Collection complete.");
}
fn normalize_stats(response: String) -> Value {
// do some stuff
json!({
"temperature": 30,
"wind_speed": 1.5,
"wind_dir": 1.1,
})
}
fn send_command(addr: &std::net::SocketAddr, command: &str) -> String {
let mut stream = TcpStream::connect_timeout(addr, Duration::from_millis(200)).unwrap();
let _ = write_data(&mut stream, command.as_bytes());
let res = read_data(&mut stream).unwrap();
// remove null bytes_read
let _ = res.replace('\0', "");
res
}
fn write_data(stream: &mut impl Write, data: &[u8]) -> io::Result<()> {
stream.write_all(data).unwrap();
stream.flush()
}
const MESSAGE_BUFFER_SIZE: usize = 8;
fn read_data(buf: &mut impl Read) -> io::Result<String> {
let mut recvd: Vec<u8> = vec![];
let mut rx_bytes = [0u8; MESSAGE_BUFFER_SIZE];
loop {
let bytes_read = buf.read(&mut rx_bytes)?;
recvd.extend_from_slice(&rx_bytes[..bytes_read]);
if bytes_read < MESSAGE_BUFFER_SIZE {
break;
}
}
String::from_utf8(recvd).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Received invalid UTF-8 data from server",
)
})
}