Basic Tokio File io problem

As an exercise, using an async-ed nested loop I want to receive messages from multiple peers (connected to in the outer loop) and write the same message to a peer specific logfile (in the inner loop). Examples in the Tokio crate illustration the use of tokio::io::File are few so I guess my problem must be boringly trivial...

In semi-pseudo code what I want to achieve looks like this:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

    loop {
        
        // Asynchronously wait for an inbound socket.
        let (mut socket, peer_addr) = listener.accept().await?;

        let filepath = format!("/tmp/peer-{}.log", &peer_addr);
        let peer_logfile = Path::new(&filepath);

        tokio::spawn(async move {

            let mut buf = [0; 1024];

            let client_logfile = File::create(&peer_logfile);

            loop {

                < receive peer msg from socket into &buf>;

               client_logfile.write_all(&buf);   /// Problem here! Question: Howto?

            }
        }
    }
}

Help much appreciated!

You didn't tell me your error message, but from experience I'm guessing you need this import:

use tokio::io::AsyncWriteExt;

Note that it is not recommended to use stack buffers like your buf = [0; 1024] in async functions, because they don't actually end up on the stack — they're a variable in the future, and those are stored on the heap together with the rest of the future object. Using them makes your future object massive, and it is recommended to use either Vec<u8> or Box<[u8]> instead.

Your code has a suspicious lack of .await. Are you using the File from std?

Thanks for the feedback - I'm learning... Here is the complete (yet not too long) file, followed by the err msg I get.

In Cargo.toml:
futures = "0.3"
tokio = { version = "0.2.11", features = ["full"] }

//! A "hello world" echo server [from Tokio][echo-example]
//!
//! This server will create a TCP listener, accept connections in a loop, and
//! write back everything that's read off of each TCP connection.
//!
//! Because the Tokio runtime uses a thread pool, each TCP connection is
//! processed concurrently with all other TCP connections across multiple
//! threads.
//!
//! To see this server in action, you can run this in one terminal:
//!
//!     cargo +nightly run --example echo
//!
//! and in another terminal you can run:
//!
//!     nc localhost 3000
//!
//! Each line you type in to the `netcat` terminal should be echo'd back to
//! you! If you open up multiple terminals with `netcat` instances connected
//! to the same address you should be able to see them all make progress simultaneously.
//!
//! [echo-example]: https://github.com/tokio-rs/tokio/blob/master/tokio/examples/echo.rs

use futures::future::{FutureExt, TryFutureExt};
use tokio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::fs::File;
use tokio::net::TcpListener;
// use tokio::prelude::*;

use std::{
    error::Error,
    net::SocketAddr,

    path::Path,
};

use tracing::{debug, info, info_span, trace_span, warn};
use tracing_futures::Instrument;

const ADDR_DEFAULT: &str = "127.0.0.1:3000";


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

    let addr = ADDR_DEFAULT
        .to_string()
        .parse::<SocketAddr>()?;

    let mut listener = TcpListener::bind(&addr).await?;

    println!("Listening on {}", addr);

    loop {
        // Asynchronously wait for an inbound socket.
        let (mut socket, peer_addr) = listener.accept().await?;

        println!("Peer addr: {}", peer_addr);

        let filepath = format!("/tmp/echo-client-{}.log", &peer_addr);
        let client_logfile_path = Path::new(&filepath);

        tokio::spawn(async move {

            let mut buf = Box::new([u8]);

            let client_logfile = File::create(&client_logfile_path);

            // In a loop, read data from the socket and write to file.

            loop {

                let n: usize = socket
                    .read(&mut buf)
                    .map(|bytes| {
                        if let Ok(n) = bytes {
                            println!("bytes_read: {}", n);
                        }

                        bytes
                    })
                    .map_err(|error| {
                        error
                    })
                    .await
                    .expect("failed to read data from socket");

                if n == 0 {
                    return;
                }

                // write to file

                client_logfile.and_then(|mut file| file.write_all(&buf)).await;

            }
        })
    }
}

This is the error msg I get:

/home/stustd/.cargo/bin/cargo +nightly run --color=always --package cxtelemetry --bin echo-server
warning: dependency (clap) specified without providing a local path, Git repository, or version to use. This will be considered an error in future versions
warning: unused manifest key: dependencies.clap.versionw
   Compiling cxtelemetry v0.1.0 (/home/stustd/projects/cxtelemetry)
error[E0515]: cannot return value referencing function parameter `file`
   --> src/bin/echo-server.rs:190:56
    |
190 |                     client_logfile.and_then(|mut file| file.write_all(&buf));
    |                                                        ----^^^^^^^^^^^^^^^^
    |                                                        |
    |                                                        returns a value referencing data owned by the current function
    |                                                        `file` is borrowed here

error[E0382]: use of moved value: `client_logfile`
   --> src/bin/echo-server.rs:190:21
    |
162 |             let client_logfile = File::create(&client_logfile_path);
    |                 -------------- move occurs because `client_logfile` has type `impl core::future::future::Future`, which does not implement the `Copy` trait
...
190 |                     client_logfile.and_then(|mut file| file.write_all(&buf));
    |                     ^^^^^^^^^^^^^^ value moved here, in previous iteration of loop

error[E0597]: `filepath` does not live long enough
   --> src/bin/echo-server.rs:139:45
    |
139 |         let client_logfile_path = Path::new(&filepath);
    |                                   ----------^^^^^^^^^-
    |                                   |         |
    |                                   |         borrowed value does not live long enough
    |                                   argument requires that `filepath` is borrowed for `'static`
...
219 |     }
    |     - `filepath` dropped here while still borrowed

error: aborting due to 3 previous errors

Some errors have detailed explanations: E0382, E0515, E0597.
For more information about an error, try `rustc --explain E0382`.
error: could not compile `cxtelemetry`.

To learn more, run the command again with --verbose.

Process finished with exit code 101

Perhaps someone could have a look at it. Thanks.

You should not use and_then like that. Handle any errors if they happen and unwrap the result using either the question mark operator, a match, or a call to unwrap. If you are not familiar with error handling in Rust, you should read chapter nine in the Rust book.

The last error is because Path is merely a reference to data owned somewhere else, and that kind of value cannot be moved into a tokio::spawn. You should use a PathBuf instead.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.