I have been on and off trying to convert our project from tokio_core
over to tokio
and seem to have reached a wall with my current understanding. I'd like to get to tokio 0.2 in the end, but trying to first get to tokio 0.1 so I can use tokio-compat
and migrate module wise.
The issue I seem to be facing is with finding a way to refactor tokio_core::reactor::Handle::spawn()
to the tokio
.
The current logic is to pass the old tokio_core::Handle
to a reference counted Session
that then wraps a Session::spawn()
around Handle::spawn()
like so:
pub fn spawn<F, R>(&self,f: F)
where
F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item = (), Error = ()>,
R::Future: 'static,
{
self.0.handle.spawn(f)
}
Then, a thread can spawn a future by calling with a closure session.spawn(|| {})
. My initial effort to port this over to tokio 0.1
was to change from tokio_core::reactor::Handle::spawn()
to current_thread::spawn()
. So instead of a closure, session.spawn()
directly took futures. But this fails when trying to spawn futures from a different thread, with Err(SpawnError { is_shutdown: true })
(as expected now that I revisit my code). I can't also seem to pass the new tokio::runtime::current_thread::Handle::spawn
and spawn tasks with this, as Send
is required for the spawned future (unlike the old tokio_core::reactor::Handle::spawn()
).
I also tried to wrap the Future from the other thread in a futures::future::lazy
, but that doesn't seem to work as well, as the wrapped Future doesn't implement Send
I am now trying to revisit the closure approach that worked with tokio_core
, but I am facing issues with the closure
// Spawn a (!Send) future directly
// pub fn spawn<F>(&self, f: F)
// where
// F: Future<Item = (), Error = ()> + 'static,
// {
// // This fails when called from a different thread
// current_thread::spawn(f)
// }
// Pass in a closure
pub fn spawn<F, R>(&self,f: F)
where
F: FnOnce() -> R + Send + 'static,
R: Future<Item = (), Error = ()> + 'static,
{
// This fails when the Future doesn't implement Send
// let spawn_res = self.0.handle.spawn(f);
let mut te = current_thread::TaskExecutor::current();
let spawn_res = te.spawn_local(Box::new(f));
match spawn_res {
Ok(_) => (),
Err(e) => error!("Session SpawnErr {:?}", e),
}
}
but I don't seem to be able to successfully pass this closure, with multiple issues about Sync
not being implemented:
error[E0277]: `std::cell::Cell<bool>` cannot be shared between threads safely
--> audio\src\fetch.rs:156:17
|
156 | session.spawn(move || {
| ^^^^^ `std::cell::Cell<bool>` cannot be shared between threads safely
|
= help: within `librespot_core::session::SessionInternal`, the trait `std::marker::Sync` is not implemented for `std::cell::Cell<bool>`
What am I missing?
PS: Thanks for the initial help from gitter and /r/rust
Only two links allowed, so removed the links to docs