I am trying to use tokio::spawn
to send a stream Stream<Item = Option<Arc<dyn Array>>>
to an Arrow Flight endpoint. However, the passing of the stream to the Flight client gives me a high-kinded lifetime error of which I do not understand the source.
See below for my code (and a repo here: GitHub - nielsmeima/arrow-flight-tokio-higher-kinded-error).
use arrow2::array::{Array, MutableArray};
use arrow_format::flight::{data::FlightData, service::flight_service_client::FlightServiceClient};
use futures::stream::StreamExt;
use std::sync::Arc;
trait MyTrait {}
#[tokio::main]
async fn main() {
let mut client = FlightServiceClient::connect("http://localhost:50051")
.await
.unwrap();
// does not work
let stream = futures::stream::iter::<Vec<Option<Arc<dyn Array>>>>(vec![]);
// also does not work...
let stream =
futures::stream::iter::<Vec<Option<Arc<dyn MyTrait + Send + Sync + 'static>>>>(vec![]);
// but this works... what makes it that dyn error the lifetime of tokio spawn here?
let stream = futures::stream::iter::<Vec<Option<Arc<()>>>>(vec![]);
let fut = async move {
let mapped_stream = stream.map(|_| FlightData {
flight_descriptor: None,
data_header: vec![],
app_metadata: vec![],
data_body: vec![],
});
let result = client.do_put(mapped_stream).await;
println!("{:#?}", result);
};
tokio::spawn(fut);
}
However, this leads to the following error:
higher-ranked lifetime error could not prove for<'r> impl futures::Future<Output = ()>: std::marker::Send
I have a hard time understanding why fut
would not be Send
for the lifetime 'r
. I found the main cause of the error to be the use of trait object as the Item
of the stream (changing to a concrete type removes all errors).
I have two questions:
- Why does the use of trait object (
dyn Array
or my own traitdyn MyTrait + Send + Sync + 'static
) cause the compiler to be unable to provefor<'r>... : Send
. My best guest would that themap
causes type erasure (since it consumesOption<Arc<dyn Array>>>
) making it impossible to proofSend
for themapped_stream
. Is this the correct direction? - How can I help the compiler? Or am I misunderstanding lifetime and/or trait object concepts here and should I take a different approach?
Thanks in advance!