Future cannot be shared between threads safely with Tokio

I'm writing a plugin for an Elgato streamdeck using this Rust package. It uses Tokio to handle communication to & from the stream deck, with the example here. I based my version nearly directly off of that, with one key difference: I need to send an object and have it live as long as the program.

Here is my code:

    fn run(&mut self) {
        //error points to this line        
        tokio::run(
            self.connect
                .unwrap()
                .map_err(|e| error!("connection failed: {:?}", e))
                .and_then(|socket| {
                    socket
                        .for_each(|message| {
                            handle_message(message, &mut self.conn_mgr.unwrap());
                            Ok(())
                        })
                        .map_err(|e| error!("read error: {:?}", e))
                }),
        );
    }

handle_message requires a mutable ConnectionManager because connections are created or removed based on the messages received.

run is implemented on this struct:

struct Runner {
    //this struct needs to live as long as the program, it manages connections to an API
    conn_mgr : Option<connection_manager::ConnectionManager>,
    //reg_params is used to create connect
    reg_params : Option<RegistrationParams>,
    //this is also used in the function above, but hasn't given me issues (yet?)    
    connect : Option<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>>
}

Finally, my error is this:

error[E0277]: `(
    dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> 
       + std::marker::Send + 'static)` 
cannot be shared between threads safely

Some of the answers I've seen talk about using a move here to transfer ownership, but based on the error location (that is, not part of a closure), I'm not sure that's relevant. I get the same error message if I move run outside the struct and make the parameter &mut Runner.

1 Like

Add + Sync to that trait object. Currently it is marked Send, which means it can be moved across threads, but doesn't say that it can be shared across threads. If you want to share across threads you need Sync

dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> 
       + Send + Sync + 'static

But I suspect that you may not have access to this trait object, in that case, could you please post the complete error message

Oh, adding Sync was actually pretty easy (and I do control this one, which hasn't been the case for most of my Rust debugging): unsafe impl Sync for Runner { }

This changes my errors back to lifetime errors, which I ran into before and tried to refactor around. It's pretty verbose.

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/main.rs:139:28
    |
139 |                   .and_then( |socket| {
    |  ____________________________^
140 | |                     socket
141 | |                         .for_each(move |message| {
142 | |                             handle_message(message, &mut self.conn_mgr.unwrap());
...   |
145 | |                         .map_err(|e| error!("read error: {:?}", e))
146 | |                 }),
    | |_________________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 134:5...
   --> src/main.rs:134:5
    |
134 | /     fn run(&mut self) { //, conn_mgr : &mut connection_manager::ConnectionManager) {
135 | |         tokio::run(
136 | |             self.connect
137 | |                 .unwrap()
...   |
147 | |         );
148 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &mut Runner
               found &mut Runner
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `futures::future::and_then::AndThen<futures::future::map_err::MapErr<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:138:26: 138:66]>, futures::future::map_err::MapErr<futures::stream::for_each::ForEach<streamdeck_rs::socket::StreamDeckSocket<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:141:35: 144:26 self:&mut Runner], std::result::Result<(), streamdeck_rs::socket::StreamDeckSocketError>>, [closure@src/main.rs:145:34: 145:67]>, [closure@src/main.rs:139:28: 146:18 self:&mut Runner]>` will meet its required lifetime bounds
   --> src/main.rs:135:9
    |
135 |         tokio::run(
    |         ^^^^^^^^^^

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/main.rs:141:35
    |
141 |                           .for_each(|message| {
    |  ___________________________________^
142 | |                             handle_message(message, &mut self.conn_mgr.unwrap());
143 | |                             Ok(())
144 | |                         })
    | |_________________________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 134:5...
   --> src/main.rs:134:5
    |
134 | /     fn run(&mut self) { //, conn_mgr : &mut connection_manager::ConnectionManager) {
135 | |         tokio::run(
136 | |             self.connect
137 | |                 .unwrap()
...   |
147 | |         );
148 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &mut Runner
               found &mut Runner
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `futures::future::and_then::AndThen<futures::future::map_err::MapErr<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:138:26: 138:66]>, futures::future::map_err::MapErr<futures::stream::for_each::ForEach<streamdeck_rs::socket::StreamDeckSocket<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:141:35: 144:26 self:&mut Runner], std::result::Result<(), streamdeck_rs::socket::StreamDeckSocketError>>, [closure@src/main.rs:145:34: 145:67]>, [closure@src/main.rs:139:28: 146:18 self:&mut Runner]>` will meet its required lifetime bounds
   --> src/main.rs:135:9
    |
135 |         tokio::run(
    |         ^^^^^^^^^^

error: aborting due to 2 previous errors

Both error messages say that the type must be valid for the static lifetime, but I'm not sure if that can be done on a struct.

The error seems to be about the fact that stuff givne to tokio::run must not have references, since it might outlive the lifetime. Either move the data into the closure or if you need to share it between several places, use an Arc to share the data.

This is wrong, I meant add Sync to the trait object like how Send is added. Implementing Sync like that will almost certainly lead to UB.

Oh I didn't even notice the unsafe impl! Yeah definitely add Sync to your box inside Runner.

I'm fairly new to rust and I can't figure out what this means. Do you mean to add Sync to my ConnectionManager struct? What is a trait object?

Without the Sync changes I mentioned elsewhere, the full error message is:

error[E0277]: `(dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/main.rs:136:9
    |
136 |         tokio::run(
    |         ^^^^^^^^^^ `(dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn futures::future::Future<Item = tokio_tcp::stream::TcpStream, Error = std::io::Error> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `streamdeck_rs::socket::ConnectState`
    = note: required because it appears within the type `std::option::Option<streamdeck_rs::socket::ConnectState>`
    = note: required because it appears within the type `streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>`
    = note: required because it appears within the type `std::option::Option<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>>`
    = note: required because it appears within the type `Runner`
    = note: required because it appears within the type `&mut Runner`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&&mut Runner`
    = note: required because it appears within the type `[closure@src/main.rs:140:28: 147:18 self:&&mut Runner]`
    = note: required because it appears within the type `futures::future::chain::Chain<futures::future::map_err::MapErr<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:139:26: 139:66]>, futures::future::map_err::MapErr<futures::stream::for_each::ForEach<streamdeck_rs::socket::StreamDeckSocket<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:142:35: 145:26 self:&&mut Runner], std::result::Result<(), streamdeck_rs::socket::StreamDeckSocketError>>, [closure@src/main.rs:146:34: 146:67]>, [closure@src/main.rs:140:28: 147:18 self:&&mut Runner]>`
    = note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::map_err::MapErr<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:139:26: 139:66]>, futures::future::map_err::MapErr<futures::stream::for_each::ForEach<streamdeck_rs::socket::StreamDeckSocket<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:142:35: 145:26 self:&&mut Runner], std::result::Result<(), streamdeck_rs::socket::StreamDeckSocketError>>, [closure@src/main.rs:146:34: 146:67]>, [closure@src/main.rs:140:28: 147:18 self:&&mut Runner]>`
    = note: required by `tokio::runtime::threadpool::run`

error: aborting due to previous error

I'm guessing that you created Runner, could we see the code for it?

Wrap the field with the type std::option::Option<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>> in a Mutex.

The reason this works is because Mutex can take a thing that is just Send and make it Sync. As per the Sync implementation for Mutex. Now, this change will affect your code quite a bit, because you will have to lock the mutex every time that you want to access it, to alleviate this you can create a simple wrapper type:

use std::sync::{Mutex MutexGuard};

struct SyncOption<T>(Mutex<Option<T>>);

impl<T> SyncOption<T> {
    pub fn new(value: Option<T>) -> Self {
        Self(Mutex::new(value))
    }
    
    pub fn get_mut(&mut self) -> &mut Option<T> {
        self.0.get_mut().unwrap()
    }
    
    pub fn lock(&self) -> MutexGuard<'_, Option<T>> {
        self.0.lock().unwrap()
    }
    
    pub fn take(&self) -> Option<T> {
        self.lock().take()
    }
}

You can add on other methods that you would find useful.

The full code for Runner is:


struct Runner {
    conn_mgr : Option<connection_manager::ConnectionManager>,
    reg_params : Option<RegistrationParams>,
    connect : SyncOption<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>>
}

trait RunStreamdeck {
    fn register(&mut self) -> ();
    fn run(&mut self) -> ();
}

impl RunStreamdeck for Runner {
    fn register(&mut self) {
        let params = RegistrationParams::from_args(env::args()).unwrap();
        self.reg_params = Some(params);
        self.connect = 
            SyncOption::new(
                Some(
                    StreamDeckSocket::<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>::connect(
                        params.port, 
                        params.event, 
                        params.uuid,
            )));
    }

    fn run(&mut self) {
        tokio::run(
            self.connect
                .take().unwrap()
                .map_err(|e| error!("connection failed: {:?}", e))
                .and_then(|socket| {
                    socket
                        .for_each(|message| {
                            handle_message(message, &mut self.conn_mgr.unwrap());
                            Ok(())
                        })
                        .map_err(|e| error!("read error: {:?}", e))
                }),
        );
    }
}

This gives me lifetime errors on the socket and message closures (I only added socket because, as far as I can tell, it's basically the same error):

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/main.rs:164:27
    |
164 |                   .and_then(|socket| {
    |  ___________________________^
165 | |                     socket
166 | |                         .for_each(|message| {
167 | |                             handle_message(message, &mut self.conn_mgr.unwrap());
...   |
170 | |                         .map_err(|e| error!("read error: {:?}", e))
171 | |                 }),
    | |_________________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 159:5...
   --> src/main.rs:159:5
    |
159 | /     fn run(&mut self) {
160 | |         tokio::run(
161 | |             self.connect
162 | |                 .take().unwrap()
...   |
172 | |         );
173 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &&mut Runner
               found &&mut Runner
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `futures::future::and_then::AndThen<futures::future::map_err::MapErr<streamdeck_rs::socket::Connect<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:163:26: 163:66]>, futures::future::map_err::MapErr<futures::stream::for_each::ForEach<streamdeck_rs::socket::StreamDeckSocket<GlobalSettings, ActionSettings, MessageFromSd, MessageToSd>, [closure@src/main.rs:166:35: 169:26 self:&&mut Runner], std::result::Result<(), streamdeck_rs::socket::StreamDeckSocketError>>, [closure@src/main.rs:170:34: 170:67]>, [closure@src/main.rs:164:27: 171:18 self:&&mut Runner]>` will meet its required lifetime bounds

I get the same error with or without move. Making conn_mgr a SyncOption made a little bit of sense to me but didn't change the error message, so I left that as an Option for now.

I can pull all this tokio::run code out of this object, drop it into main, and compile, so I know that it's not totally wrong.

You can't borrow from any locals because tokio::run requires 'static.

As @RustyYato said, a Mutex is the easy way to make something Sync. But be advised that when using futures, you cannot hold a locked Mutex while blocking on a Future. If you try, you'll end up blocking the entire Tokio reactor. Instead, you must use a futures-aware mutex. If you're using futures-0.1, use futures_locks::Mutex - Rust . If you're using std::future with async/await, use tokio::sync::Mutex - Rust.

3 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.