I am writing a TCPModbus server. The server spawns a thread to update registers at a regular interval. It also listens for connections on an ip:port
and connects to them. I have tried using tokio::sync::Mutex instead of std::sync::mutex but that creates some problems of its own. Something about futures not being able to
This is the error I get
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/bin/server.rs:171:46
|
170 | accept_tcp_connection(stream, socket_addr, move |_addr| {
| ------------ this is not `async`
171 | let service = service.lock().await.clone();
| ^^^^^ only allowed inside `async` functions and blocks
Here is my code
// server.rs
use tokio::net::TcpStream;
use core::net::SocketAddr;
use tokio::net::TcpListener;
use tokio_modbus::{
prelude::*,
server::tcp::{accept_tcp_connection, Server},
server::Service,
};
use std::collections::HashMap;
use std::future;
use std::fs::File;
use serde::Deserialize;
use rand::Rng;
use csv::ReaderBuilder;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Deserialize,Clone)]
struct Config {
category: String,
code: String,
address_in: u16,
func_type_read: u16,
address_out: u16,
func_type_write: u16,
var_dimension: u16,
var_length: u16,
bit_position: u16,
a_value: f64,
b_value: f64,
var_encoding: u16,
var_type: bool, // same as `type` in the csv but `type` is a reserved keyword
decimal: u16,
signed: bool,
min_value: f64,
max_value: f64,
measure_unit: String
}
async fn load_config(file_path: &str) -> HashMap<String, Config> {
// Load the configuration from the CSV file
let mut configs = HashMap::new();
let file = File::open(file_path).unwrap();
let mut reader = ReaderBuilder::new().from_reader(file);
for result in reader.records() {
let record = result.unwrap();
let config = Config {
category: record[0].to_string(),
code: record[1].to_string(),
address_in: record[2].parse().unwrap(),
func_type_read: record[3].parse().unwrap(),
address_out: record[4].parse().unwrap(),
func_type_write: record[5].parse().unwrap(),
var_dimension: record[6].parse().unwrap(),
var_length: record[7].parse().unwrap(),
bit_position: record[8].parse().unwrap(),
a_value: record[9].parse().unwrap(),
b_value: record[10].parse().unwrap(),
var_encoding: record[11].parse().unwrap(),
var_type: record[12].parse().unwrap(),
signed: record[13].parse().unwrap(),
decimal: record[14].parse().unwrap(),
min_value: record[15].parse().unwrap(),
max_value: record[16].parse().unwrap(),
measure_unit: record[17].to_string()
};
configs.insert(config.code.clone(), config);
}
configs
}
#[derive(Clone)]
struct ModbusServer {
configs: HashMap<String, Config>,
holding_registers: HashMap<u16, f64>,
}
impl ModbusServer {
async fn new(file_path: &str) -> Self {
let configs = load_config(file_path).await;
let mut holding_registers = HashMap::new();
for config in configs.values() {
holding_registers.insert(config.address_in, 0.0);
}
ModbusServer { configs, holding_registers }
}
}
impl Service for ModbusServer {
type Request = Request<'static>;
type Future = future::Ready<Result<Response, Exception>>;
fn call(& self, req: Self::Request) -> Self::Future { // handles the requests by the clients
match req {
Request::ReadHoldingRegisters(addr, cnt) => {
let mut values = vec![0; cnt as usize];
for i in 0..cnt {
let reg_addr = addr + i;
if let Some(value) = self.holding_registers.get(®_addr) {
values[i as usize] = *value as u16;
} else {
println!("SERVER: Exception::IllegalDataAddress");
return future::ready(Err(Exception::IllegalDataAddress));
}
}
future::ready(Ok(Response::ReadHoldingRegisters(values)))
}
// Request::WriteMultipleRegisters(addr, values) => {
// for (i, value) in values.iter().enumerate() {
// let reg_addr = addr + i as u16;
// self.holding_registers.borrow_mut().insert(reg_addr, *value as f64);
// }
// future::ready(Ok(Response::WriteMultipleRegisters(addr, values.len() as u16)))
// }
_ => {
println!("SERVER: Exception::IllegalFunction");
future::ready(Err(Exception::IllegalFunction))
}
}
}
}
async fn update_registers(server: &mut Arc<Mutex<ModbusServer>>) {
let mut rng = rand::thread_rng();
loop {
// Lock the mutex before accessing the ModbusServer
let locked_server = server.lock().await;
let configs = locked_server.configs.values().cloned().collect::<Vec<_>>();
drop(locked_server); // Drop the lock before the loop
for config in configs {
let min_value = (config.min_value - config.b_value) / config.a_value;
let max_value = (config.max_value - config.b_value) / config.a_value;
let new_value = rng.gen_range(min_value..=max_value);
let mut locked_server = server.lock().await;
locked_server.holding_registers.insert(config.address_in, new_value);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialising
let file_path = "config.csv";
let modbus_server = ModbusServer::new(file_path).await;
let shared_modbus_server = Arc::new(Mutex::new(modbus_server));
let shared_modbus_server_clone = shared_modbus_server.clone(); // to be used later, otherwise the value is moved when spawning threads
// Define a task to update registers every second
let update_task = async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
interval.tick().await;
update_registers(&mut shared_modbus_server.clone()).await;
}
};
// Spawn the update task
tokio::spawn(update_task);
// listening for the client connections
let listener = TcpListener::bind("127.0.0.1:5020").await?;
let server = Server::new(listener);
// connecting to the client
let on_connected = move |stream: TcpStream, socket_addr: SocketAddr| {
let shared_modbus_server = shared_modbus_server_clone.clone();
async move {
let service = shared_modbus_server.clone();
accept_tcp_connection(stream, socket_addr, move |_addr| {
let service = service.lock().await.clone();
Ok::<_, std::io::Error>(Some(service))
})
}
};
let on_process_error = |err| {
eprintln!("Error: {:?}", err);
};
server.serve(&on_connected, on_process_error).await?;
Ok(())
}
I have tried using tokio::task:;spawn for this but am unable to move forward