Satisfying tokio::spawn 'static lifetime requirement

I am attempting to create a library which uses Runners to run a Process over a series of inputs. The runner in this example is an async runner using the tokio runtime and removes the complexity of managing the process from the user. The idea is to be able to let users create their own processes which are responsible only for doing something with the data.

use async_trait::async_trait;
use futures::{StreamExt, stream::FuturesUnordered};

struct Output {
    response: String,
}

#[async_trait]
trait Processor {
    async fn process(&self, request: &str) -> Output;
}

struct MyProcessor {}

#[async_trait]
impl Processor for MyProcessor {
    async fn process(&self, request: &str) -> Output {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Output {
            response: format!("Processed: {}", request),
        }
    }
}

struct Runner {}

impl Runner {
    pub async fn run<I, P>(&'static self, input: I, processor: &'static P)
    where
        I: Iterator<Item = String>,
        P: Processor + Sync + Send + 'static,
    {
        let mut workers = FuturesUnordered::new();

        input.for_each(|request| {
            workers.push(tokio::spawn(async move {
                processor.process(&request).await;
            }))
        });

        while let Some(output) = workers.next().await {
            match output {
                Ok(output) => println!("{:?}", output),
                Err(error) => println!("{:?}", error),
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let input = vec!["first".to_string(), "second".to_string(), "third".to_string()].into_iter();
    let my_processor = MyProcessor {};
    let runner = Runner {};
    runner
        .run(input, &my_processor)
        .await;
    println!("Done!");
}

(Playground)

I have tried various different options but I am stumped... how can I statisfy the lifetime requirements here?

error[E0597]: `runner` does not live long enough
  --> src/lib.rs:55:5
   |
55 | /     runner
56 | |         .run(input, &my_processor)
   | |                                  ^
   | |                                  |
   | |__________________________________borrowed value does not live long enough
   |                                    argument requires that `runner` is borrowed for `'static`
...
59 |   }
   |   - `runner` dropped here while still borrowed

error[E0597]: `my_processor` does not live long enough
  --> src/lib.rs:56:21
   |
55 | /     runner
56 | |         .run(input, &my_processor)
   | |_____________________^^^^^^^^^^^^^- argument requires that `my_processor` is borrowed for `'static`
   |                       |
   |                       borrowed value does not live long enough
...
59 |   }
   |   - `my_processor` dropped here while still borrowed

&'static self is a red flag. It's basically never what you want – it can only be satisfied if you have something that lives for the 'static lifetime. However, the T: 'static annotation does NOT mean that you need to have a value that lives for the 'static lifetime. It means that it's valid to hold on to the value for the 'static lifetime, i.e., it doesn't have any temporary references.

Accordingly, you must not try to move a temporary reference into your async block. Take the processor by value instead (and swap the spawn() and the for_each()): Playground.

Use an Arc to share the processor rather than a reference.

use async_trait::async_trait;
use futures::{StreamExt, stream::FuturesUnordered};
use std::sync::Arc;

#[derive(Debug)]
struct Output {
    response: String,
}

#[async_trait]
trait Processor {
    async fn process(&self, request: &str) -> Output;
}

struct MyProcessor {}

#[async_trait]
impl Processor for MyProcessor {
    async fn process(&self, request: &str) -> Output {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        Output {
            response: format!("Processed: {}", request),
        }
    }
}

struct Runner {}

impl Runner {
    pub async fn run<I, P>(&self, input: I, processor: Arc<P>)
    where
        I: Iterator<Item = String>,
        P: Processor + Sync + Send + 'static,
    {
        let mut workers = FuturesUnordered::new();

        input.for_each(|request| {
            let processor = Arc::clone(&processor);
            workers.push(tokio::spawn(async move {
                processor.process(&request).await
            }))
        });

        while let Some(output) = workers.next().await {
            match output {
                Ok(output) => println!("{:?}", output),
                Err(error) => println!("{:?}", error),
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let input = vec!["first".to_string(), "second".to_string(), "third".to_string()].into_iter();
    let my_processor = Arc::new(MyProcessor {});
    let runner = Runner {};
    runner
        .run(input, my_processor)
        .await;
    println!("Done!");
}
1 Like

Thank you for such a fast response!

  1. I didn't realize that you could have multiple trait bounds like that. I understand changing I: Iterator<..> to I: IntoIterator<..>. If the input I: IntoIterator<..> is already bound by Send, why is it necessary to additionally bind I::IntoIter to Send?

  2. In my implementation, the Runner has all sorts of configuration for the http client, throttling (because async was borking my company's dev servers), etc.. and the processor implementation runs a long chain of requests. I was aiming for each processor request to run in it's own Tokio task. Does swapping the spawn() and for_each() allow this? Are there any downsides to running all the processes in a single task?

alice! you have come up in so many of my searches recently :rofl:.. thanks for the response. I tried this earlier, but had additional issues that you fixed in your example. I got the playground code to work with this. I created the Arc in the run method instead to move it out of the library client code and make it cleaner for users.

As I wrote in a previous response... my implementation of the/a Runner has other properties on the struct to manage and control processing. This is causing tokio::spawn 'static requirement issue on the &self argument of the run() method. The properties are being used in the run method directly, and a reference to reqwest::Client in the runner is being passed to the Processors process() method. How can I specify that the &self parameter lives as long as 'static?

You don't. You should interpret the 'static requirement as "don't use references" rather than "I need to put 'static on my references".

Anything you want to share across a tokio::spawn boundary should be shared with an Arc, and not with a reference.

Because those are two different types.

No, that's one task that performs many operations in a loop. But with Alice's suggestion, the refcounted pointer to a single Processor is cloneable, so you can just do the spawn-in-a-loop dance again, moving clones of the pointer-to-Processor into each task.

If you have fields in the Runner that you wish to share with the spawned tasks, then do this:

struct Runner {
    shared: Arc<RunnerInner>,
}

struct RunnerInner {
    // shared fields go here
}

You can pass clones of the Arc<RunnerInner> across a tokio::spawn boundary.

2 Likes

I did that already with most of the properties.. i think that the problem is my interpretation of the reqwest::Client docs where it says:

You do not have to wrap the Client in an Rc or Arc to reuse it, because it already uses an Arc internally.

Not sure if this is getting away from the original question.. let me know and I'll start a new topic if I am still having issues.

Ah, well, to make use of the Arc that the client has internally, you need to clone it. Just like how you have to clone an Arc to share it.

2 Likes

Yeah, I cloned it before I read your response (after intentionally offending myself with some politically incorrect terms) :rofl:

Just wanted to say thanks to you guys (@alice and @H2CO3) :+1: :tada: for getting back to me so quickly, and how grateful I am that we have the rust user forum. I normally scour the web for answers to my questions (and with Rust I have had a lot) and can figure it out, but after a certain amount of trial and error it's great to know that there are knowledgeable people around to discuss and help out.

Once I get the initial version of the code working I'll separate the Processor implementations for my company into a separate crate, improve the documentation and publish it on crates.io. The input side of the Runner uses the csv and handlebars crates in a renderer to dynamically generate a request body so that it can process requests from different sources in bulk.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.