Have a problem with move between threads serde::Serialize structures

Hi, everyone. I'm facing this problem and I can't understand how it can be solved without using 'static or clone everything.

use std::time::Duration;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct DataMessage {
    field1: u32,
    field2: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct DataMessage2 {
    field1: i32,
    field2: Vec<u8>,
}

async fn heavy_compute<DataMessageType: serde::Serialize + Send>(
    data_message: DataMessageType,
    handler_data: &[u8],
) -> anyhow::Result<String> {
    tokio::task::spawn_blocking(move || {
        std::thread::sleep(Duration::from_secs(1));

        let buf = serde_json::to_vec(&data_message)?;
        let len1 = buf.len();
        let len2 = handler_data.len();

        Ok(format!("result {} {}", len1, len2).to_string())
    })
    .await?
}

struct Handler {
    handler_data: Vec<u8>,
}

impl Handler {
    pub fn new(data: &[u8]) -> Self {
        Handler {
            handler_data: data.to_vec(),
        }
    }

    pub async fn compute<DataMessageType: serde::Serialize + Send>(
        &self,
        data_message: DataMessageType,
    ) -> anyhow::Result<String> {
        let res = heavy_compute(data_message, self.handler_data.as_slice()).await?;

        Ok(res)
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let handler = Handler::new(&[1, 1, 1, 1]);

    let data_message1 = DataMessage {
        field1: 1,
        field2: "wow".to_string(),
    };

    let data_message2 = DataMessage2 {
        field1: 2,
        field2: vec![2, 2, 2, 2],
    };

    let res1 = handler.compute(data_message1).await?;

    println!("res1 {}", res1);

    let res2 = handler.compute(data_message2).await?;

    println!("res2 {}", res2);

    Ok(())
}

compiler output:

error[E0310]: the parameter type `DataMessageType` may not live long enough
  --> src\main.rs:41:5
   |
41 | /     tokio::task::spawn_blocking(move || {
42 | |         std::thread::sleep(Duration::from_secs(1));
43 | |
44 | |         let buf = serde_json::to_vec(&data_message)?;
...  |
48 | |         Ok(format!("result {} {}", len1, len2).to_string())
49 | |     })
   | |______^ ...so that the type `DataMessageType` will meet its required lifetime bounds
   |
help: consider adding an explicit lifetime bound...
   |
37 | async fn heavy_compute<DataMessageType: serde::Serialize + Send + 'static>(
   |                                                                 +++++++++

error[E0311]: the parameter type `DataMessageType` may not live long enough
  --> src\main.rs:41:5
   |
41 | /     tokio::task::spawn_blocking(move || {
42 | |         std::thread::sleep(Duration::from_secs(1));
43 | |
44 | |         let buf = serde_json::to_vec(&data_message)?;
...  |
48 | |         Ok(format!("result {} {}", len1, len2).to_string())
49 | |     })
   | |______^
   |
note: the parameter type `DataMessageType` must be valid for the anonymous lifetime defined here...
  --> src\main.rs:39:19
   |
39 |     handler_data: &[u8],
   |                   ^^^^^
note: ...so that the type `DataMessageType` will meet its required lifetime bounds
  --> src\main.rs:41:5
   |
41 | /     tokio::task::spawn_blocking(move || {
42 | |         std::thread::sleep(Duration::from_secs(1));
43 | |
44 | |         let buf = serde_json::to_vec(&data_message)?;
...  |
48 | |         Ok(format!("result {} {}", len1, len2).to_string())
49 | |     })
   | |______^
help: consider adding an explicit lifetime bound...
   |
37 ~ async fn heavy_compute<'a, DataMessageType: serde::Serialize + Send + 'a>(
38 |     data_message: DataMessageType,
39 ~     handler_data: &'a [u8],
   |

error[E0521]: borrowed data escapes outside of function
  --> src\main.rs:41:5
   |
39 |       handler_data: &[u8],
   |       ------------  - let's call the lifetime of this reference `'1`
   |       |
   |       `handler_data` is a reference that is only valid in the function body
40 |   ) -> anyhow::Result<String> {
41 | /     tokio::task::spawn_blocking(move || {
42 | |         std::thread::sleep(Duration::from_secs(1));
43 | |
44 | |         let buf = serde_json::to_vec(&data_message)?;
...  |
48 | |         Ok(format!("result {} {}", len1, len2).to_string())
49 | |     })
   | |      ^
   | |      |
   | |______`handler_data` escapes the function body here
   |        argument requires that `'1` must outlive `'static`

Some errors have detailed explanations: E0310, E0311, E0521.
For more information about an error, try `rustc --explain E0310`.

You could get rid of 2 of 3 errors by literally following the compiler's suggestion of adding 'static bounds. The remaining 1 is then trivial to solve by capturing owned data instead of a borrowed slice in the closure. Playground.

Thank you for response. About last one.

 let handler_data = handler_data.to_vec();

This will cause it to allocate a new buffer and copy the data. Which is not a good option.

For an application like this where you have large, unchanging data that you want to use in spawned tasks, the solution is Arc<[u8]>. This way it you have a cheaply clonable 'static pointer to the data.

use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct DataMessage {
    field1: u32,
    field2: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct DataMessage2 {
    field1: i32,
    field2: Vec<u8>,
}

async fn heavy_compute<DataMessageType: serde::Serialize + Send + 'static>(
    data_message: DataMessageType,
    handler_data: Arc<[u8]>,
) -> anyhow::Result<String> {
    tokio::task::spawn_blocking(move || {
        std::thread::sleep(Duration::from_secs(1));

        let buf = serde_json::to_vec(&data_message)?;
        let len1 = buf.len();
        let len2 = handler_data.len();

        Ok(format!("result {} {}", len1, len2).to_string())
    })
    .await?
}

struct Handler {
    handler_data: Arc<[u8]>,
}

impl Handler {
    pub fn new(data: Arc<[u8]>) -> Self {
        Handler { handler_data: data }
    }

    pub async fn compute<DataMessageType: serde::Serialize + Send + 'static>(
        &self,
        data_message: DataMessageType,
    ) -> anyhow::Result<String> {
        let res = heavy_compute(data_message, self.handler_data.clone()).await?;

        Ok(res)
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let handler = Handler::new(Arc::new([1, 1, 1, 1]));

    let data_message1 = DataMessage {
        field1: 1,
        field2: "wow".to_string(),
    };

    let data_message2 = DataMessage2 {
        field1: 2,
        field2: vec![2, 2, 2, 2],
    };

    let res1 = handler.compute(data_message1).await?;

    println!("res1 {}", res1);

    let res2 = handler.compute(data_message2).await?;

    println!("res2 {}", res2);

    Ok(())
}

https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=b7a8a702d961c718dcabfe6e5b668414

Arc<[u8]> can be created from Vec<u8> with .into(); this will require copying the bytes but only once on startup.

1 Like

I know that it will allocate, but the lifetime annotations of task::spawn require that your closure do not capture references.

Copying and allocating is not as bad as people tend to think. There are languages out there, not to be named, that copy a lot more, heap-allocate everything, and still have large user bases. You don't need to worry about a single clone operation unless you have found, by actually benchmarking your code, that it is the slow part among all the work your application is doing.

Given that you called your function heavy_compute, I highly doubt that the time required for the allocation of the vector would be anywhere near dominant.

2 Likes

Thank you. Maybe that's a better option.

If take the most comparable languages, such as C or golang, there you just pass a pointer to the data. If the language allows you not to allocate an extra buffer, it is better to do it that way, but so far Rust is very unclear for me in this aspect, exactly when async runtime like tokio comes into play. Thanks for your help anyway.

Yeah, because C doesn't care about lifetimes (so you'll have memory management errors), and Go automatically heap-allocates everything (so it doesn't have to care about lifetimes).

Congrats, Rust just prevented you from creating a dangling reference.

No, this is not true as-is. Your case is a clear demonstration that pass-by-reference is not always possible to do correctly. Would you rather have correct code, or would you rather "optimize" something based on speculation that you didn't even measure?

It is clear that the code as a whole will be optimized, but the problem is not to use extra memory allocation, but to know how to do it at all on Rust + tokio.