Has type `std::sync::MutexGuard<'_, UpdateProgress>` which is not `Send`

Hi, I develop an update system, which create patches between 2 binairies.

When building patches, I get

let state = update_state.lock();
    |             ----- has type `std::sync::MutexGuard<'_, UpdateProgress>` which is not `Send`
...
273 |         let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;
    |                                                                             ^^^^^^ await occurs here, with `state` maybe used later
...
305 |     }
    |     - `state` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`

std::sync::Mutex seems to not implement Send trait
I have to use tokio::sync::MutexGuard ?

If yes, I don't know how to call lock fn, because i call it from future::strea::Stream::poll_next which doesn't implement async

Correct[1]. Having multiple references to the Mutex can be achieved by wrapping it in an Arc which implements Send.

Note that if you are using a mutex in an asynchronous setup with tokio, tokio provides its own version of Mutex which you should use instead. Locking tokio's mutex is safe across await points.


  1. Wrong. MutexGuard, which is returned when calling mutex.lock() successfully is not Send ↩︎

To be clear, Tokio's Mutex shouldn't be used instead in all cases - there's a good section in the docs on when std's Mutex is a better option.

3 Likes

I already try std::sync::Mutex and get issue with Send trait.
So the only way is to use tokio::sync::Mutex

But, like I said, I can not call an async fn from

impl<'a> Stream for UpdatePackageStream<'a> {
    type Item = Result<SharedUpdateProgress, UpdateError>;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        let download_poll = this.download_stream.poll_next_unpin(cx);
        let apply_poll = this.apply_stream.poll_next_unpin(cx);

        match (download_poll, apply_poll) {
            (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
            (Poll::Pending, Poll::Pending) => Poll::Pending,
            (Poll::Pending, Poll::Ready(None)) => Poll::Pending,
            (Poll::Ready(None), Poll::Pending) => Poll::Pending,
            (Poll::Ready(Some(Err(err))), _) => {
                // Download errors cause the apply thread to be cancelled
                this.apply_stream.cancel();
                Poll::Ready(Some(Err(err)))
            }
            (download_poll, apply_poll) => {
                let mut delta = Progression::default();
                if let Poll::Ready(Some(Ok(download_progress))) = download_poll {
                    this.state.borrow_mut().available = download_progress.available;

                    let mut state = this.shared_state.lock();
                    state.downloading_operation_idx = download_progress.available.operation_idx;
                    delta.downloaded_files = download_progress.delta_downloaded_files;
                    delta.downloaded_bytes = download_progress.delta_downloaded_bytes;
                    this.apply_stream.notify(download_progress.available);
                }
                if let Poll::Ready(Some(apply_progress)) = apply_poll {
                    match apply_progress {
                        Ok(apply_progress) => {
                            this.state.borrow_mut().applied.operation_idx =
                                apply_progress.operation_idx;
                            let mut state = this.shared_state.lock();
                            state.applying_operation_idx = apply_progress.operation_idx;
                            delta.applied_files = apply_progress.delta_applied_files;
                            delta.applied_input_bytes = apply_progress.delta_input_bytes;
                            delta.applied_output_bytes = apply_progress.delta_output_bytes;
                        }
                        Err(ApplyError::OperationFailed { path, slice, cause }) => {
                            warn!("{} failed: {}", path, cause);
                            let mut state = this.state.borrow_mut();
                            state.failures.push(match slice {
                                Some(slice) => metadata::v1::Failure::Slice { path, slice },
                                None => metadata::v1::Failure::Path { path },
                            });
                            delta.failed_files = 1;
                        }
                        Err(ApplyError::Cancelled) => {}
                        Err(ApplyError::PoisonError) => {
                            return Poll::Ready(Some(Err(UpdateError::PoisonError)))
                        }
                    }
                }

                {
                    let mut state = this.shared_state.lock(); //here
                    state.inc_progress(delta);
                }

                Poll::Ready(Some(Ok(this.shared_state.clone())))
            }
        }
    }
}

future::stream::Stream::poll_next is not an async fn
So I don't know how to do

Have you tried this? Also, there is chapter 16.3 from the book which describes shared-state concurrency with Mutexes. What I wanted you to try (i.e. make your shared_state into something like Arc<Mutex<_>> to make it Send) is described there as well.

This looks like it should work. Downloaded speedupdate-rs and compiled the lib crate just fine.

struct UpdatePackageStream<'a> {
    state: Rc<RefCell<StateUpdating>>,
    shared_state: Arc<Mutex<SharedUpdateProgress>>,
    download_stream: DownloadStream<'a>,
    apply_stream: ApplyStream,
}

Something like this ?

I'm wondering why you can't compile speedupdate-rs. If you look at SharedUpdateProgress here:

you see that it already is a wrapper enabling synchronized access to the underlying data. I can't reproduce the error you posted with the speedupdate crate. To me, the implementation looks fine.

I get issue from local and with GHA Update release.yml · Ludea/speedupdate-rs@5a41c51 · GitHub

I see, the error arises in a different crate of the workspace.

could you try and put these two lines into an extra pair of brackets like this:

{
    let state = update_state.lock();
    let progress = state.histogram.progress();
}

and

remove this line, as state is never used?

Edit: if you do this, there should be two remaining errors

Errors
error: future cannot be sent between threads safely
   --> server/grpc/src/rpc.rs:210:98
    |
210 |       async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
    |  __________________________________________________________________________________________________^
211 | |         let inner = request.into_inner();
212 | |         let repository_path = inner.path;
213 | |         let repository = Repository::new(PathBuf::from(repository_path));
...   |
306 | |         Ok(Response::new(reply))
307 | |     }
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>`
note: future is not `Send` as this value is used across an await
   --> server/grpc/src/rpc.rs:260:50
    |
257 |         let mut update_stream = workspace.update(&link, goal_version, UpdateOptions::default());
    |             ----------------- has type `Pin<Box<dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>>>` which is not `Send`
...
260 |         if let Some(state) = update_stream.next().await {
    |                                                  ^^^^^^ await occurs here, with `mut update_stream` maybe used later
...
307 |     }
    |     - `mut update_stream` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`

error: future cannot be sent between threads safely
   --> server/grpc/src/rpc.rs:210:98
    |
210 |       async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
    |  __________________________________________________________________________________________________^
211 | |         let inner = request.into_inner();
212 | |         let repository_path = inner.path;
213 | |         let repository = Repository::new(PathBuf::from(repository_path));
...   |
306 | |         Ok(Response::new(reply))
307 | |     }
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedBuildProgress, BuildError>>`
note: future is not `Send` as this value is used across an await
   --> server/grpc/src/rpc.rs:289:49
    |
286 |         let mut build_stream = builder.build();
    |             ---------------- has type `Pin<Box<dyn Stream<Item = Result<SharedBuildProgress, BuildError>>>>` which is not `Send`
...
289 |         if let Some(state) = build_stream.next().await {
    |                                                 ^^^^^^ await occurs here, with `mut build_stream` maybe used later
...
307 |     }
    |     - `mut build_stream` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`

error: could not compile `speedupdategrpcserver` due to 2 previous errors
warning: build failed, waiting for other jobs to finish...

They arise here:

boxed_local, unlike boxed, doesn't add the Send marker to the Stream trait object. I don't know how to fix this, as I can't figure out the code base that fast (just replacing boxed_local with boxed is not possible, because the compiler can't guarantee that the stream is in fact Send). Maybe you know how to get the Send marker to work here:

like

pub type BuildProgressStream<'a> =
    Pin<Box<dyn Stream<Item = Result<SharedBuildProgress, BuildError>> + Send + 'a>>;
  pub fn build(&self) -> BuildProgressStream<'_> {
    |                            ----------------------- expected `Pin<Box<dyn futures::Stream<Item = std::result::Result<SharedBuildProgress, BuildError>> + std::marker::Send>>` because of return type
...
346 |         stream::select(rx_stream, w_stream).boxed_local()
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait `futures::Stream<Item = std::result::Result<SharedBuildProgress, BuildError>> + std::marker::Send`, found trait `futures::Stream<Item = std::result::Result<SharedBuildProgress, _>>`
    |
    = note: expected struct `Pin<Box<dyn futures::Stream<Item = std::result::Result<SharedBuildProgress, BuildError>> + std::marker::Send>>`
               found struct `Pin<Box<dyn futures::Stream<Item = std::result::Result<SharedBuildProgress, _>>>>`

Like I said:

Just adding the Send marker to your type definition won't change the fact that boxed_local returns a trait object that doesn't have the Send marker present. This leads to a type mismatch. You must make sure that the stream you return from the build method actually is Send. I don't know how, as I'm unfamiliar with your code base.

This one speedupdate-rs/packager.rs at master · Ludea/speedupdate-rs · GitHub ?

Exactly. builder.build

and workspace.update

implemented here:

produce streams that are not Send. If you build this project you should see these error messages:

Errors
error: future cannot be sent between threads safely
   --> server/grpc/src/rpc.rs:210:98
    |
210 |       async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
    |  __________________________________________________________________________________________________^
211 | |         let inner = request.into_inner();
212 | |         let repository_path = inner.path;
213 | |         let repository = Repository::new(PathBuf::from(repository_path));
...   |
306 | |         Ok(Response::new(reply))
307 | |     }
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>`
note: future is not `Send` as this value is used across an await
   --> server/grpc/src/rpc.rs:260:50
    |
257 |         let mut update_stream = workspace.update(&link, goal_version, UpdateOptions::default());
    |             ----------------- has type `Pin<Box<dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>>>>` which is not `Send`
...
260 |         if let Some(state) = update_stream.next().await {
    |                                                  ^^^^^^ await occurs here, with `mut update_stream` maybe used later
...
307 |     }
    |     - `mut update_stream` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`

error: future cannot be sent between threads safely
   --> server/grpc/src/rpc.rs:210:98
    |
210 |       async fn build(&self, request: Request<BuildInput>) -> Result<Response<BuildOutput>, Status> {
    |  __________________________________________________________________________________________________^
211 | |         let inner = request.into_inner();
212 | |         let repository_path = inner.path;
213 | |         let repository = Repository::new(PathBuf::from(repository_path));
...   |
306 | |         Ok(Response::new(reply))
307 | |     }
    | |_____^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn Stream<Item = Result<SharedBuildProgress, BuildError>>`
note: future is not `Send` as this value is used across an await
   --> server/grpc/src/rpc.rs:289:49
    |
286 |         let mut build_stream = builder.build();
    |             ---------------- has type `Pin<Box<dyn Stream<Item = Result<SharedBuildProgress, BuildError>>>>` which is not `Send`
...
289 |         if let Some(state) = build_stream.next().await {
    |                                                 ^^^^^^ await occurs here, with `mut build_stream` maybe used later
...
307 |     }
    |     - `mut build_stream` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send`

error: could not compile `speedupdategrpcserver` due to 2 previous errors
warning: build failed, waiting for other jobs to finish...

Note that this should instead be written as:

let progress = {
    let state = update_state.lock();
    state.histogram.progress()
}

or

let progress;
{
    let state = update_state.lock();
    progress = state.histogram.progress();
}

The first snippet won't compile (extra semicolon makes progress a unit).

adding Send to

pub type GlobalProgressStream<'a> =
    Pin<Box<dyn Stream<Item = Result<SharedUpdateProgress, UpdateError>> + Send +  'a>>;

still get issue

Maybe I have to refactor speedupdate-rs/rpc.rs at master · Ludea/speedupdate-rs · GitHub ?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.