Hi there, I've tried many ways to fix this and didn't seem to get anywhere. Would appreciate any advice.
I changed my code from synchronous to async, and think I've got it down to one remaining bug, which is a reference lifetime bug.
error[E0521]: borrowed data escapes outside of function
--> src/expressions.rs:42:22
|
35 | fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
| ------ - let's call the lifetime of this reference `'1`
| |
| `inputs` is a reference that is only valid in the function body
...
42 | let ca = s.str()?;
| ^^^^^^^^
| |
| `inputs` escapes the function body here
| argument requires that `'1` must outlive `'static`
The code here is the handler function for a Polars plugin (i.e. the interface exposed to Python via PyO3). To my understanding this means that the function signature cannot be changed.
#[polars_expr(output_type=String)]
fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
let s = &inputs[0];
let name = s.name().to_string();
let endpoint = &kwargs.endpoint;
let client = Client::new();
let response_texts = match s.dtype() {
DataType::String => {
let ca = s.str()?;
let rt = Runtime::new().unwrap();
let futures: Vec<_> = ca.into_iter().map(|opt_v| {
let client = client.clone();
let endpoint = endpoint.clone();
let name = name.clone();
tokio::spawn(async move {
match opt_v {
Some(v) => {
let name_owned = name.clone();
let mut params = HashMap::new();
params.insert(name_owned.as_str(), v);
handle_api_response(client, &endpoint, ¶ms).await
}
None => None
}
})
}).collect();
let results = rt.block_on(async {
join_all(futures).await
});
let texts: Vec<Option<String>> = results.into_iter().map(|res| {
match res {
Ok(opt) => opt,
Err(_) => None, // Handle the join error if needed
}
}).collect();
StringChunked::from_iter(texts)
},
DataType::Int32 => {
let ca = s.i32()?;
let rt = Runtime::new().unwrap();
let futures: Vec<_> = ca.into_iter().map(|opt_v| {
let client = client.clone();
let endpoint = endpoint.clone();
let name = name.clone();
tokio::spawn(async move {
match opt_v {
Some(v) => {
let name_owned = name.clone();
let v_str = v.to_string();
let mut params = HashMap::new();
params.insert(name_owned.as_str(), v_str.as_str());
handle_api_response(client, &endpoint, ¶ms).await
}
None => None
}
})
}).collect();
let results = rt.block_on(async {
join_all(futures).await
});
let texts: Vec<Option<String>> = results.into_iter().map(|res| {
match res {
Ok(opt) => opt,
Err(_) => None, // Handle the join error if needed
}
}).collect();
StringChunked::from_iter(texts)
},
DataType::Int64 => {
let ca = s.i64()?;
let rt = Runtime::new().unwrap();
let futures: Vec<_> = ca.into_iter().map(|opt_v| {
let client = client.clone();
let endpoint = endpoint.clone();
let name = name.clone();
tokio::spawn(async move {
match opt_v {
Some(v) => {
let name_owned = name.clone();
let v_str = v.to_string();
let mut params = HashMap::new();
params.insert(name_owned.as_str(), v_str.as_str());
handle_api_response(client, &endpoint, ¶ms).await
}
None => None
}
})
}).collect();
let results = rt.block_on(async {
join_all(futures).await
});
let texts: Vec<Option<String>> = results.into_iter().map(|res| {
match res {
Ok(opt) => opt,
Err(_) => None, // Handle the join error if needed
}
}).collect();
StringChunked::from_iter(texts)
},
dtype => polars_bail!(InvalidOperation:format!("Data type {dtype} not \
supported for api_call, expected String, Int32, Int64.")),
};
// let struct_series = StructChunked::new(&[("response_text", response_texts), ("status_code", status_codes)])?.into_series();
Ok(response_texts.into_series())
}
Full code: httpolars
It takes a inputs: &[Series]
parameter which "escapes the function body" through its use in a variable ca
.
The variable ca
is assigned according to the dtype of the Polars series s
(the first element of inputs
).
Note: a 'series' is a column in a dataframe, sort of like a column in an Excel spreadsheet but with a known dtype. This is why it makes sense to do a
match
expression on the data type of the seriess
before calling thestr
/i32
/i64
method.
My aim is to iterate over the values in the series, 'spawn' asynchronous 'futures' (awaitable asynchronous function calls) and then 'join' them all up after they finish.
For reference, here is what the synchronous code looked like (via):
#[polars_expr(output_type=String)]
fn api_call(inputs: &[Series], kwargs: ApiCallKwargs) -> PolarsResult<Series> {
let s = &inputs[0];
let name = s.name();
let endpoint = &kwargs.endpoint;
let response_texts = match s.dtype() {
DataType::String => {
let ca = s.str()?;
let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
opt_v.map_or(None, |v| {
let mut params = HashMap::new();
params.insert(name, v);
handle_api_response(endpoint, ¶ms)
})
}).collect();
StringChunked::from_iter(texts)
},
DataType::Int32 => {
let ca = s.i32()?;
let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
opt_v.map_or(None, |v| {
let mut params = HashMap::new();
let v_str = v.to_string();
params.insert(name, v_str.as_str());
handle_api_response(endpoint, ¶ms)
})
}).collect();
StringChunked::from_iter(texts)
},
DataType::Int64 => {
let ca = s.i64()?;
let texts: Vec<Option<String>> = ca.into_iter().map(|opt_v| {
opt_v.map_or(None, |v| {
let mut params = HashMap::new();
let v_str = v.to_string();
params.insert(name, v_str.as_str());
handle_api_response(endpoint, ¶ms)
})
}).collect();
StringChunked::from_iter(texts)
},
dtype => polars_bail!(InvalidOperation:format!("Data type {dtype} not \
supported for api_call, expected String, Int32, Int64.")),
};
// let struct_series = StructChunked::new(&[("response_text", response_texts), ("status_code", status_codes)])?.into_series();
Ok(response_texts.into_series())
}