How to resolve Pin<Box<dyn Stream + 'to_resolve_lifetime>> when capature a reference from function parameter

I want to implement a evalutor for execution plan. The eval method of evalutor take a immutable reference of plan and return a enum Value warpped with Result.

Here, one variance of Value can be a Stream. Compiler complain that &plan should be given a 'static lifetime, but &plan cannot be 'static.

use futures::{stream, Stream, StreamExt};
use std::{iter::repeat, pin::Pin};

enum Plan {
    Mapping {
        items: Vec<(String, Plan)>,
        prev: Box<Plan>,
    },
    OtherPlan,
}

enum Value {
    Constant(i32),
    List(Vec<Value>),
    Table {
        schema: Vec<String>,
        records: Pin<Box<dyn Stream<Item = Value>>>,
    },
}

struct Evaluator;

impl Evaluator {
    fn eval(&self, input: &Value, plan: &Plan) -> Result<Value, Box<dyn std::error::Error + '_>> {
        match plan {
            Plan::Mapping { items, prev } => {
                // evaluate previous plan `pre`
                //  - the result of evaluation is a stream `records`
                //  - each item of `records` is a Value::List, a.k.a Record
                let (schema, records) = match self.eval(input, prev)? {
                    Value::Table { schema, records } => (schema, records),
                    _ => panic!("result should be a Stream"),
                };

                // do map for each item in previous stream
                let mapping_stream =
                    stream::iter(repeat(items))
                        .zip(records)
                        .map(|(items, value)| {
                            let input = Value::Table {
                                schema: schema.clone(),
                                records: Box::pin(stream::iter(vec![value])),
                            };

                            // vec![value] = mapping record
                            let record: Vec<_> = items
                                .iter()
                                .map(move |(s, p)| self.eval(&input, p))
                                .collect::<Result<Vec<_>, Box<dyn std::error::Error>>>()
                                .unwrap();

                            Value::List(record)
                        });

                Ok(Value::Table {
                    schema: items.iter().map(|(t, _)| t.clone()).collect(),
                    records: Box::pin(mapping_stream),
                })
            }
            Plan::OtherPlan => Ok(Value::Constant(10)),
        }
    }
}
error[E0621]: explicit lifetime required in the type of `plan`
  --> src/recursive.rs:59:30
   |
59 |                     records: Box::pin(mapping_stream),
   |                              ^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required

For more information about this error, try `rustc --explain E0621`.
warning: `scratch` (lib) generated 1 warning
error: could not compile `scratch` due to previous error; 1 warning emitted

Why are you using a Stream? There's nothing async in the evaluation of the stream as far as I can tell.

When compiler is telling you to use 'static reference what it is really trying to say is that temporary references are forbidden, and you must use Arc or copy the data instead.

Try:

stream::iter(repeat(items).cloned())
                        .zip(records.iter().cloned())

or change functions input types to (&self, input: Value, plan: Plan).

Alternatively, if it's really really important to keep these as references, you will be forced to add lifetime annotations to your enum to permit that.

enum Value<'borrowedfromsomewhere> {
    Constant(i32),
    List(Vec<Value<'borrowedfromsomewhere>>),
    Table {
        schema: Vec<String>,
        records: Pin<Box<dyn Stream<Item = Value<'borrowedfromsomewhere>> + 'borrowedfromsomewhere>>,
    },
}

but without running the borrow checker on your code I'm not sure if that is sufficient to make it work. It will certainly "infect" the entire codebase with annotations and severely limit where Value can be used.

after evaluation, the return Result<Value> has a stream, and it will be written into sink in a async function.

I use Arc to solve this problem. thanks

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.