Migrating from `tokio_core` => `tokio`

I have been on and off trying to convert our project from tokio_core over to tokio and seem to have reached a wall with my current understanding. I'd like to get to tokio 0.2 in the end, but trying to first get to tokio 0.1 so I can use tokio-compat and migrate module wise.

The issue I seem to be facing is with finding a way to refactor tokio_core::reactor::Handle::spawn() to the tokio.

The current logic is to pass the old tokio_core::Handle to a reference counted Session that then wraps a Session::spawn() around Handle::spawn() like so:

pub fn spawn<F, R>(&self,f: F)
    F: FnOnce(&Handle) -> R + Send + 'static,
    R: IntoFuture<Item = (), Error = ()>,
    R::Future: 'static,

Then, a thread can spawn a future by calling with a closure session.spawn(|| {}). My initial effort to port this over to tokio 0.1 was to change from tokio_core::reactor::Handle::spawn() to current_thread::spawn(). So instead of a closure, session.spawn() directly took futures. But this fails when trying to spawn futures from a different thread, with Err(SpawnError { is_shutdown: true }) (as expected now that I revisit my code). I can't also seem to pass the new tokio::runtime::current_thread::Handle::spawn and spawn tasks with this, as Send is required for the spawned future (unlike the old tokio_core::reactor::Handle::spawn()).

I also tried to wrap the Future from the other thread in a futures::future::lazy, but that doesn't seem to work as well, as the wrapped Future doesn't implement Send

I am now trying to revisit the closure approach that worked with tokio_core, but I am facing issues with the closure

// Spawn a (!Send) future directly
    // pub fn spawn<F>(&self, f: F)
    // where
    //     F: Future<Item = (), Error = ()> + 'static,
    // {
    //     // This fails when called from a different thread
    //     current_thread::spawn(f)
    // }

// Pass in a closure
pub fn spawn<F, R>(&self,f: F)
    F: FnOnce() -> R + Send + 'static,
    R: Future<Item = (), Error = ()> +  'static,
    // This fails when the Future doesn't implement Send
    // let spawn_res = self.0.handle.spawn(f);

    let mut te = current_thread::TaskExecutor::current();
    let spawn_res = te.spawn_local(Box::new(f));

    match spawn_res {
        Ok(_) => (),
        Err(e) => error!("Session SpawnErr {:?}", e),

but I don't seem to be able to successfully pass this closure, with multiple issues about Sync not being implemented:

error[E0277]: `std::cell::Cell<bool>` cannot be shared between threads safely
   --> audio\src\fetch.rs:156:17
156 |         session.spawn(move || {
    |                 ^^^^^ `std::cell::Cell<bool>` cannot be shared between threads safely
    = help: within `librespot_core::session::SessionInternal`, the trait `std::marker::Sync` is not implemented for `std::cell::Cell<bool>`

What am I missing?

PS: Thanks for the initial help from gitter and /r/rust
Only two links allowed, so removed the links to docs

Hey! I'm the poster from reddit. Generally I will strongly recommend that you build your futures such that they can be sent between threads. Can you perhaps show the data structure that contains the Cell<bool> and how you use it? A future with a cell should be ok, but once you have a &Cell or Arc<Cell> in your future, you're in trouble.

I have managed to get it working, thanks to the tips from @alice over on discord:

so the issue is this: docs.rs/tokio/0.1.22/tokio/runtime/current_thread/struct.Handle.html#impl-Sync
The issue is not present in 0.2: docs.rs/tokio/0.2.13/tokio/runtime/struct.Handle.html
If you want to make it work with 0.1, you can put the Handle in a mutex
So I guess the issue is that the tokio 0.1 current thread handle went halfway to multi-thread safety, so it made your object less thread safe while requiring it should be more thread safe

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.