Hi, I develop an update system, which create patches between 2 binairies.
When building patches, I get
let state = update_state.lock();
| ----- has type `std::sync::MutexGuard<'_, UpdateProgress>` which is not `Send`
...
273 | let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;
| ^^^^^^ await occurs here, with `state` maybe used later
...
305 | }
| - `state` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`
std::sync::Mutex seems to not implement Send trait
I have to use tokio::sync::MutexGuard ?
If yes, I don't know how to call lock fn, because i call it from future::strea::Stream::poll_next which doesn't implement async
Correct[1]. Having multiple references to the Mutex can be achieved by wrapping it in an Arc which implements Send.
Note that if you are using a mutex in an asynchronous setup with tokio, tokio provides its own version of Mutex which you should use instead. Locking tokio's mutex is safe across await points.
Wrong. MutexGuard, which is returned when calling mutex.lock() successfully is not Send↩︎
Have you tried this? Also, there is chapter 16.3 from the book which describes shared-state concurrency with Mutexes. What I wanted you to try (i.e. make your shared_state into something like Arc<Mutex<_>>to make it Send) is described there as well.
I'm wondering why you can't compile speedupdate-rs. If you look at SharedUpdateProgress here:
you see that it already is a wrapper enabling synchronized access to the underlying data. I can't reproduce the error you posted with the speedupdate crate. To me, the implementation looks fine.
I see, the error arises in a different crate of the workspace.
could you try and put these two lines into an extra pair of brackets like this:
{
let state = update_state.lock();
let progress = state.histogram.progress();
}
and
remove this line, as state is never used?
Edit: if you do this, there should be two remaining errors
Errors
error: future cannot be sent between threads safely
--> server/grpc/src/rpc.rs:210:98
|
210 | async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
| __________________________________________________________________________________________________^
211 | | let inner = request.into_inner();
212 | | let repository_path = inner.path;
213 | | let repository = Repository::new(PathBuf::from(repository_path));
... |
306 | | Ok(Response::new(reply))
307 | | }
| |_____^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>`
note: future is not `Send` as this value is used across an await
--> server/grpc/src/rpc.rs:260:50
|
257 | let mut update_stream = workspace.update(&link, goal_version, UpdateOptions::default());
| ----------------- has type `Pin<Box<dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>>>` which is not `Send`
...
260 | if let Some(state) = update_stream.next().await {
| ^^^^^^ await occurs here, with `mut update_stream` maybe used later
...
307 | }
| - `mut update_stream` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`
error: future cannot be sent between threads safely
--> server/grpc/src/rpc.rs:210:98
|
210 | async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
| __________________________________________________________________________________________________^
211 | | let inner = request.into_inner();
212 | | let repository_path = inner.path;
213 | | let repository = Repository::new(PathBuf::from(repository_path));
... |
306 | | Ok(Response::new(reply))
307 | | }
| |_____^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedBuildProgress, BuildError>>`
note: future is not `Send` as this value is used across an await
--> server/grpc/src/rpc.rs:289:49
|
286 | let mut build_stream = builder.build();
| ---------------- has type `Pin<Box<dyn Stream<Item = Result<SharedBuildProgress, BuildError>>>>` which is not `Send`
...
289 | if let Some(state) = build_stream.next().await {
| ^^^^^^ await occurs here, with `mut build_stream` maybe used later
...
307 | }
| - `mut build_stream` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`
error: could not compile `speedupdategrpcserver` due to 2 previous errors
warning: build failed, waiting for other jobs to finish...
They arise here:
boxed_local, unlike boxed, doesn't add the Send marker to the Stream trait object. I don't know how to fix this, as I can't figure out the code base that fast (just replacing boxed_local with boxed is not possible, because the compiler can't guarantee that the stream is in fact Send). Maybe you know how to get the Send marker to work here:
Just adding the Send marker to your type definition won't change the fact that boxed_local returns a trait object that doesn't have the Send marker present. This leads to a type mismatch. You must make sure that the stream you return from the build method actually is Send. I don't know how, as I'm unfamiliar with your code base.
produce streams that are not Send. If you build this project you should see these error messages:
Errors
error: future cannot be sent between threads safely
--> server/grpc/src/rpc.rs:210:98
|
210 | async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
| __________________________________________________________________________________________________^
211 | | let inner = request.into_inner();
212 | | let repository_path = inner.path;
213 | | let repository = Repository::new(PathBuf::from(repository_path));
... |
306 | | Ok(Response::new(reply))
307 | | }
| |_____^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>`
note: future is not `Send` as this value is used across an await
--> server/grpc/src/rpc.rs:260:50
|
257 | let mut update_stream = workspace.update(&link, goal_version, UpdateOptions::default());
| ----------------- has type `Pin<Box<dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>>>` which is not `Send`
...
260 | if let Some(state) = update_stream.next().await {
| ^^^^^^ await occurs here, with `mut update_stream` maybe used later
...
307 | }
| - `mut update_stream` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`
error: future cannot be sent between threads safely
--> server/grpc/src/rpc.rs:210:98
|
210 | async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
| __________________________________________________________________________________________________^
211 | | let inner = request.into_inner();
212 | | let repository_path = inner.path;
213 | | let repository = Repository::new(PathBuf::from(repository_path));
... |
306 | | Ok(Response::new(reply))
307 | | }
| |_____^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedBuildProgress, BuildError>>`
note: future is not `Send` as this value is used across an await
--> server/grpc/src/rpc.rs:289:49
|
286 | let mut build_stream = builder.build();
| ---------------- has type `Pin<Box<dyn Stream<Item = Result<SharedBuildProgress, BuildError>>>>` which is not `Send`
...
289 | if let Some(state) = build_stream.next().await {
| ^^^^^^ await occurs here, with `mut build_stream` maybe used later
...
307 | }
| - `mut build_stream` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`
error: could not compile `speedupdategrpcserver` due to 2 previous errors
warning: build failed, waiting for other jobs to finish...