Function input reference escapes the function body

Hi there, I've tried many ways to fix this and didn't seem to get anywhere. Would appreciate any advice.

I changed my code from synchronous to async, and think I've got it down to one remaining bug, which is a reference lifetime bug.

error[E0521]: borrowed data escapes outside of function
  --> src/expressions.rs:42:22
   |
35 | fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
   |             ------  - let's call the lifetime of this reference `'1`
   |             |
   |             `inputs` is a reference that is only valid in the function body
...
42 |             let ca = s.str()?;
   |                      ^^^^^^^^
   |                      |
   |                      `inputs` escapes the function body here
   |                      argument requires that `'1` must outlive `'static`

The code here is the handler function for a Polars plugin (i.e. the interface exposed to Python via PyO3). To my understanding this means that the function signature cannot be changed.

#[polars_expr(output_type=String)]
fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
    let s = &inputs[0];
    let name = s.name().to_string();
    let endpoint = &kwargs.endpoint;
    let client = Client::new();
    let response_texts = match s.dtype() {
        DataType::String => {
            let ca = s.str()?;
            let rt = Runtime::new().unwrap();
            let futures: Vec<_> = ca.into_iter().map(|opt_v| {
                let client = client.clone();
                let endpoint = endpoint.clone();
                let name = name.clone();
                tokio::spawn(async move {
                    match opt_v {
                        Some(v) => {
                            let name_owned = name.clone();
                            let mut params = HashMap::new();
                            params.insert(name_owned.as_str(), v);
                            handle_api_response(client, &endpoint, &params).await
                        }
                        None => None
                    }
                })
            }).collect();
            let results = rt.block_on(async {
                join_all(futures).await
            });
            let texts: Vec<Option<String>> = results.into_iter().map(|res| {
                match res {
                    Ok(opt) => opt,
                    Err(_) => None, // Handle the join error if needed
                }
            }).collect();
            StringChunked::from_iter(texts)
        },
        DataType::Int32 => {
            let ca = s.i32()?;
            let rt = Runtime::new().unwrap();
            let futures: Vec<_> = ca.into_iter().map(|opt_v| {
                let client = client.clone();
                let endpoint = endpoint.clone();
                let name = name.clone();
                tokio::spawn(async move {
                    match opt_v {
                        Some(v) => {
                            let name_owned = name.clone();
                            let v_str = v.to_string();
                            let mut params = HashMap::new();
                            params.insert(name_owned.as_str(), v_str.as_str());
                            handle_api_response(client, &endpoint, &params).await
                        }
                        None => None
                    }
                })
            }).collect();
            let results = rt.block_on(async {
                join_all(futures).await
            });
            let texts: Vec<Option<String>> = results.into_iter().map(|res| {
                match res {
                    Ok(opt) => opt,
                    Err(_) => None, // Handle the join error if needed
                }
            }).collect();
            StringChunked::from_iter(texts)
        },
        DataType::Int64 => {
            let ca = s.i64()?;
            let rt = Runtime::new().unwrap();
            let futures: Vec<_> = ca.into_iter().map(|opt_v| {
                let client = client.clone();
                let endpoint = endpoint.clone();
                let name = name.clone();
                tokio::spawn(async move {
                    match opt_v {
                        Some(v) => {
                            let name_owned = name.clone();
                            let v_str = v.to_string();
                            let mut params = HashMap::new();
                            params.insert(name_owned.as_str(), v_str.as_str());
                            handle_api_response(client, &endpoint, &params).await
                        }
                        None => None
                    }
                })
            }).collect();
            let results = rt.block_on(async {
                join_all(futures).await
            });
            let texts: Vec<Option<String>> = results.into_iter().map(|res| {
                match res {
                    Ok(opt) => opt,
                    Err(_) => None, // Handle the join error if needed
                }
            }).collect();
            StringChunked::from_iter(texts)
        },
        dtype => polars_bail!(InvalidOperation:format!("Data type {dtype} not \
             supported for api_call, expected String, Int32, Int64.")),
    };


    // let struct_series = StructChunked::new(&[("response_text", response_texts), ("status_code", status_codes)])?.into_series();
    Ok(response_texts.into_series())
}

Full code: httpolars

It takes a inputs: &[Series] parameter which "escapes the function body" through its use in a variable ca.

The variable ca is assigned according to the dtype of the Polars series s (the first element of inputs).

Note: a 'series' is a column in a dataframe, sort of like a column in an Excel spreadsheet but with a known dtype. This is why it makes sense to do a match expression on the data type of the series s before calling the str/i32/i64 method.

My aim is to iterate over the values in the series, 'spawn' asynchronous 'futures' (awaitable asynchronous function calls) and then 'join' them all up after they finish.

For reference, here is what the synchronous code looked like (via):

#[polars_expr(output_type=String)]
fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
    let s = &inputs[0];
    let name = s.name();
    let endpoint = &kwargs.endpoint;
    let response_texts = match s.dtype() {
        DataType::String => {
            let ca = s.str()?;
            let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
                opt_v.map_or(None, |v| {
                    let mut params = HashMap::new();
                    params.insert(name, v);
                    handle_api_response(endpoint, &params)
                })
            }).collect();
            StringChunked::from_iter(texts)
        },
        DataType::Int32 => {
            let ca = s.i32()?;
            let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
                opt_v.map_or(None, |v| {
                    let mut params = HashMap::new();
                    let v_str = v.to_string();
                    params.insert(name, v_str.as_str());
                    handle_api_response(endpoint, &params)
                })
            }).collect();
            StringChunked::from_iter(texts)
        },
        DataType::Int64 => {
            let ca = s.i64()?;
            let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
                opt_v.map_or(None, |v| {
                    let mut params = HashMap::new();
                    let v_str = v.to_string();
                    params.insert(name, v_str.as_str());
                    handle_api_response(endpoint, &params)
                })
            }).collect();
            StringChunked::from_iter(texts)
        },
        dtype => polars_bail!(InvalidOperation:format!("Data type {dtype} not \
             supported for api_call, expected String, Int32, Int64.")),
    };


    // let struct_series = StructChunked::new(&[("response_text", response_texts), ("status_code", status_codes)])?.into_series();
    Ok(response_texts.into_series())
}

opt_v is a Option<&str> and is captured in the future passed to tokio::spawn. However tokio::spawn requires the future to be 'static, and the reference in opt_v is not. If you insist in using tokio::spawn you'll have to convert that reference into a String (but you'll lose in performance).

However since you're not doing anything async in tokio::spawn you likely only want to parallelize this. For that you'll be better off with something like rayon, which also allows you to capture non-'static futures. Edit: looks like polars already includes an integration with rayon, as it offers the par_iter/par_iter_indexed methods on the ChunkedArray type (the type of ca)

3 Likes

Thank you @SkiFire13!

I'm calling an async function in there, the get method on reqwest::Client, within the handle_api_response, no?

It goes fn api_call -> tokio spawn -> async fn handle_api_response -> async fn make_request

The make_request function calls client.get()...await here

Also par_iter does not support i64/f64 dtypes

Edit thanks for the advice @SkiFire13 this got me unblocked :tada:

Sorry what? In what sense? Iterators don't care what their exact item type is.

No @paramagnetic, sorry if that was unclear, it’s a Polars implementation of the rayon feature as I understand it, I confirmed it myself. I can use par_iter for a string dtype polars Series but not for the i64/f64 ones. I saw it mentioned here

par_iter is not yet supported for all ChunkedArrays. If you want parallelization you can to slice the arrays in the number of threads and process those in parallel. This gives you far larger units of work per threads. polars_core::split_ca might give some inspiration. Its private so don't depend on it.

– ritchie46

Commented May 8, 2022 at 9:03