Unable to use accept_tcp_connection from tokio-modbus

I am writing a TCPModbus server. The server spawns a thread to update registers at a regular interval. It also listens for connections on an ip:port and connects to them. I have tried using tokio::sync::Mutex instead of std::sync::mutex but that creates some problems of its own. Something about futures not being able to

This is the error I get

  error[E0728]: `await` is only allowed inside `async` functions and blocks
   --> src/bin/server.rs:171:46
    |
170 |             accept_tcp_connection(stream, socket_addr, move |_addr| {
    |                                                        ------------ this is not `async`
171 |                 let service = service.lock().await.clone();
    |                                              ^^^^^ only allowed inside `async` functions and blocks


Here is my code

// server.rs
use tokio::net::TcpStream;
use core::net::SocketAddr;
use tokio::net::TcpListener;
use tokio_modbus::{
    prelude::*,
    server::tcp::{accept_tcp_connection, Server},
    server::Service,
};
use std::collections::HashMap;
use std::future;
use std::fs::File;
use serde::Deserialize;
use rand::Rng;
use csv::ReaderBuilder;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Debug, Deserialize,Clone)]
struct Config {
    category: String,
    code: String,
    address_in: u16,
    func_type_read: u16,
    address_out: u16,
    func_type_write: u16,
    var_dimension: u16,
    var_length: u16,
    bit_position: u16,
    a_value: f64,
    b_value: f64,
    var_encoding: u16,
    var_type: bool, // same as `type` in the csv but `type` is a reserved keyword
    decimal: u16,
    signed: bool,
    min_value: f64,
    max_value: f64,
    measure_unit: String
}

async fn load_config(file_path: &str) -> HashMap<String, Config> {
    // Load the configuration from the CSV file
    let mut configs = HashMap::new();
    let file = File::open(file_path).unwrap();
    let mut reader = ReaderBuilder::new().from_reader(file);
    for result in reader.records() {
        let record = result.unwrap();
        let config = Config {
            category: record[0].to_string(),
            code: record[1].to_string(),
            address_in: record[2].parse().unwrap(),
            func_type_read: record[3].parse().unwrap(),
            address_out: record[4].parse().unwrap(),
            func_type_write: record[5].parse().unwrap(),
            var_dimension: record[6].parse().unwrap(),
            var_length: record[7].parse().unwrap(),
            bit_position: record[8].parse().unwrap(),
            a_value: record[9].parse().unwrap(),
            b_value: record[10].parse().unwrap(),
            var_encoding: record[11].parse().unwrap(),
            var_type: record[12].parse().unwrap(),
            signed: record[13].parse().unwrap(),
            decimal: record[14].parse().unwrap(),
            min_value: record[15].parse().unwrap(),
            max_value: record[16].parse().unwrap(),
            measure_unit: record[17].to_string()
        };
        configs.insert(config.code.clone(), config);
    }
    configs
}
#[derive(Clone)]
struct ModbusServer {
    configs: HashMap<String, Config>,
    holding_registers: HashMap<u16, f64>,
}
impl ModbusServer {
    async fn new(file_path: &str) -> Self {
        let configs = load_config(file_path).await;
        let mut holding_registers = HashMap::new();
        for config in configs.values() {
            holding_registers.insert(config.address_in, 0.0);
        }
        ModbusServer { configs, holding_registers }
    }
}
impl Service for ModbusServer {
    type Request = Request<'static>;
    type Future = future::Ready<Result<Response, Exception>>;
    fn call(& self, req: Self::Request) -> Self::Future { // handles the requests by the clients
        match req {
            Request::ReadHoldingRegisters(addr, cnt) => {
                let mut values = vec![0; cnt as usize];
                for i in 0..cnt {
                    let reg_addr = addr + i;
                    if let Some(value) = self.holding_registers.get(&reg_addr) {
                        values[i as usize] = *value as u16;
                    } else {
                        println!("SERVER: Exception::IllegalDataAddress");
                        return future::ready(Err(Exception::IllegalDataAddress));
                    }
                }
                future::ready(Ok(Response::ReadHoldingRegisters(values)))
            }
            // Request::WriteMultipleRegisters(addr, values) => {
            //     for (i, value) in values.iter().enumerate() {
            //         let reg_addr = addr + i as u16;
            //         self.holding_registers.borrow_mut().insert(reg_addr, *value as f64);
            //     }
            //     future::ready(Ok(Response::WriteMultipleRegisters(addr, values.len() as u16)))
            // }
            _ => {
                println!("SERVER: Exception::IllegalFunction");
                future::ready(Err(Exception::IllegalFunction))
            }
        }
    }
}

async fn update_registers(server: &mut Arc<Mutex<ModbusServer>>) {
    let mut rng = rand::thread_rng();
    loop {
        // Lock the mutex before accessing the ModbusServer
        let locked_server = server.lock().await;
        let configs = locked_server.configs.values().cloned().collect::<Vec<_>>();
        drop(locked_server); // Drop the lock before the loop
        for config in configs {
            let min_value = (config.min_value - config.b_value) / config.a_value;
            let max_value = (config.max_value - config.b_value) / config.a_value;
            let new_value = rng.gen_range(min_value..=max_value);
            let mut locked_server = server.lock().await;
            locked_server.holding_registers.insert(config.address_in, new_value);
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // initialising
    let file_path = "config.csv";
    let modbus_server = ModbusServer::new(file_path).await;
    let shared_modbus_server = Arc::new(Mutex::new(modbus_server));
    let shared_modbus_server_clone = shared_modbus_server.clone(); // to be used later, otherwise the value is moved when spawning threads

    
    // Define a task to update registers every second
    let update_task = async move {
        let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
        loop {
            interval.tick().await;
            update_registers(&mut shared_modbus_server.clone()).await;
        }
    };

    // Spawn the update task
    tokio::spawn(update_task);

    // listening for the client connections
    let listener = TcpListener::bind("127.0.0.1:5020").await?;
    let server = Server::new(listener);
    

    // connecting to the client
    let on_connected = move |stream: TcpStream, socket_addr: SocketAddr| {
        let shared_modbus_server = shared_modbus_server_clone.clone();
        async move {
            let service = shared_modbus_server.clone();
            accept_tcp_connection(stream, socket_addr, move |_addr| {
                let service = service.lock().await.clone();
                Ok::<_, std::io::Error>(Some(service))
            })
        }
    };


    let on_process_error = |err| {
        eprintln!("Error: {:?}", err);
    };

    server.serve(&on_connected, on_process_error).await?;
    Ok(())
}

I have tried using tokio::task:;spawn for this but am unable to move forward

I probably can't give you a solution (I'm not at all familiar with this crate), but have you tried moving this line:

                let service = service.lock().await.clone();

above the call to accept_tcp_connection? That way you wouldn't be doing an await in the callback.

I assume you've seen it, but just in case there is the tcp-server example to compare with. The example looks a lot simpler, and there is no async operation being done to create the new_service parameter here.

I have tried moving that line above the accept_tcp_connection(), I end up getting a

error: future cannot be sent between threads safely
   --> src/bin/server.rs:158:5
    |
158 |     tokio::spawn(update_task);
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/bin/server.rs:149:23: 155:6}`, the trait `Send` is not implemented for `Rc<UnsafeCell<ReseedingRng<rand_chacha::chacha::ChaCha12Core, OsRng>>>`, which is required by `{async block@src/bin/server.rs:149:23: 155:6}: Send`
note: future is not `Send` as this value is used across an await
   --> src/bin/server.rs:124:43
    |
121 |     let mut rng = rand::thread_rng();
    |         ------- has type `ThreadRng` which is not `Send`
...
124 |         let locked_server = server.lock().await;
    |                                           ^^^^^ await occurs here, with `mut rng` maybe used later
note: required by a bound in `tokio::spawn`
   --> /home/arnav/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.38.0/src/task/spawn.rs:166:21
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`


for the tokio::spawn(update_task) line.

The reference code you provided is for a server that accepts a connection from the client and then performs the operations the client instructs the server to do. In my case, I want to do that and have the server perform update_registers() simultaneously. I believe this is why it looks a lot simpler, without any async code

i think you need a channel, try looking around tokio::sync::mpsc

what you'll want to do is create the channels, move the receiver inside the callback, spawn the task, await the task, then send the result of the task down the channel, where it can be read and stored, then moved into your async block.

Note that moving the line above the call to accept_tcp_connection was the correct fix for the first problem. For reference, here is the code I'm using with that fix:

            // connecting to the client
            let on_connected =
                move |stream: TcpStream, socket_addr: SocketAddr| {
                    let service = shared_modbus_server_clone.clone();
                    async move {
                        let service = service.lock().await;
                        accept_tcp_connection(
                            stream,
                            socket_addr,
                            move |_addr| {
                                Ok::<_, std::io::Error>(Some(service.clone()))
                            },
                        )
                    }
                };

For the second problem, @binarycat may be right about using a channel, but I didn't really understand that solution and I'm afraid you'll still get errors when using a channel if you don't solve the root cause of the Send error. So I tried solving it.

The error is telling us that ThreadRng is not Send, yet it is used across an await. This is a fundamental constraint when using Tokio and other work-stealing executors. At an await, Tokio may move the task to another thread. So values used across an await must be Send so they can be transferred along with the task to the other thread.

ThreadRng is specifically documented to not be Send. I tried the StdRng instead, since it is Send and is documented to be fairly similar to ThreadRng. This change solved the Send error and your main program ran without errors:

-            let mut rng = rand::thread_rng();
+            let mut rng = StdRng::from_entropy();

I also had to add these imports:

    use rand::{rngs::StdRng, SeedableRng};

If instead you need to use ThreadRng because of its performance or strength, it is possible to create one instance of ThreadRng per iteration of the outer loop in update_registers. I did this by first computing the new_value for each config.address_in in a separate block, where the ThreadRng is created and dropped in that block so the compiler can see that it is not used across an await. This collected info is then used in the nested loop where an await is done to lock the server and apply the updates. This also compiles and runs Ok.

        async fn update_registers(server: &mut Arc<Mutex<ModbusServer>>) {
            loop {
                // Lock the mutex before accessing the ModbusServer
                let locked_server = server.lock().await;
                let configs =
                    locked_server.configs.values().cloned().collect::<Vec<_>>();
                drop(locked_server); // Drop the lock before the loop
                let updates: Vec<(u16, f64)> = {
                    let mut rng = rand::thread_rng();
                    configs
                        .iter()
                        .map(|config| {
                            let min_value = (config.min_value - config.b_value)
                                / config.a_value;
                            let max_value = (config.max_value - config.b_value)
                                / config.a_value;
                            let new_value =
                                rng.gen_range(min_value..=max_value);
                            (config.address_in, new_value)
                        })
                        .collect()
                };
                for (address_in, new_value) in updates {
                    let mut locked_server = server.lock().await;
                    locked_server
                        .holding_registers
                        .insert(address_in, new_value);
                }
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            }
        }

That may or may not be exactly what you want, but hopefully it gives you enough ideas to get past these problems.

A completely different thought is that you're using the wrong mutex -- you should be using std::sync::Mutex (not tokio::sync::Mutex) for protecting the two hashmaps:

        struct ModbusServer {
            configs: HashMap<String, Config>,
            holding_registers: HashMap<u16, f64>,
        }

Initially I just assumed you needed the Tokio mutex, but looking more closely I see that you don't, and therefore you should not be using it. The Tokio mutex is used only in the rare cases where you need to hold a lock across awaits, which is tricky and can be very inefficient. In your case, you don't, but because you are using it you have to await the lock, which creates other problems. When using the std mutex you won't need to await the lock, so some of the problems you were having will go away.

For more on when to use each type of mutex see Which kind of mutex should you use? in the Tokio doc.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.