Cannot move out of `*sx` which is behind a mutable reference

async fn call_sb_dispatcher_async<T>(
    req: SbConnect,
    fut: T::Future,
    inner: Rc<RefCell<SbDispatcherInner>>,
    tx: &mut pool::Sender<Result<ReceiverLink, GwErrorReport>>,
    enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
    T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
    let path = req.path.clone();

    let result = select(Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs)), Box::pin(poll_fn(|cx| tx.poll_canceled(cx)))).await;

    let _ = match result {
        Either::Left((res, _)) => {
            let _ = tx.send(
                res
                    .map(|link| {
                        trace!("SB receiver link is opened: {:?}", path);
                        link
                    })
                    .map_err(|e| {
                        trace!("SB opening receiver link is failed with {:?}", e);
                        e
                    }),
            );
        },
        Either::Right(_) => {}
    };

    Ok(())
}

I have an issue with tx object that does not implement Clone nor Copy, and I want to use it in a closure poll_fn as well as later outside closure when calling into tx.send.

I am getting an error cannot move out of *tx which is behind a mutable reference and cannot figure out since I'm trying to pass reference into a closure. Any idea how to fix this?

Wrap in an Arc<Mutex<_>>

I've tried that approach as well, the same error will appear if I just wrap it in Arc<Mutex<_>> leaving everything else unchanged. Is there anything else I might need to add?

Then you can .clone() the Arc as many times as you need to create multiple independent pointers to the same Sender, and move a different one into each place that needs to own one.

 // Use tx only within the closure or outside but not both
    let tx_ref = &mut *tx;  // explicitly borrow to clarify scope
    let poll_canceled = Box::pin(poll_fn(move |cx| tx_ref.poll_canceled(cx)));
    let create_link = Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs));
async fn call_sb_dispatcher_async<T>(
    req: SbConnect,
    fut: T::Future,
    inner: Rc<RefCell<SbDispatcherInner>>,
    tx: &mut pool::Sender<Result<ReceiverLink, GwErrorReport>>,
    enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
    T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
    let path = req.path.clone();

    let tx_ref = &mut *tx;  // explicitly borrow to clarify scope
    let right = Box::pin(poll_fn(move |cx| tx_ref.poll_canceled(cx)));
    let left = Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs));

    //let left = create_sb_link_async::<T>(req, fut, inner, enable_cbs);
    //let right = poll_fn(|cx| tx.clone().lock().unwrap().poll_canceled(cx));

    pin_mut!(left);
    pin_mut!(right);

    let result = select(left, right).await;

    let _ = match result {
        Either::Left((res, _)) => {
            let _ = tx.send(
                res
                    .map(|link| {
                        trace!("SB receiver link is opened: {:?}", path);
                        link
                    })
                    .map_err(|e| {
                        trace!("SB opening receiver link is failed with {:?}", e);
                        e
                    }),
            );
        },
        Either::Right(_) => {}
    };

    Ok(())
}

Did you mean this? It is the same problem.

Will be good to see the full compiler error

error[E0505]: cannot move out of `*tx` because it is borrowed
   --> gw-core/src/servicebus/dispatcher.rs:193:21
    |
179 |       let tx_ref = &mut *tx;  // explicitly borrow to clarify scope
    |                    -------- borrow of `*tx` occurs here
...
193 |               let _ = tx.send(
    |  _____________________^
194 | |                 res
195 | |                     .map(|link| {
196 | |                         trace!("SB receiver link is opened: {:?}", path);
...   |
202 | |                     }),
203 | |             );
    | |_____________^ move out of `*tx` occurs here
...
209 |   }
    |   - borrow might be used here, when `right` is dropped and runs the destructor for type `Pin<Box<impl futures_util::Future<Output = ()>>>

Lets use an Arc and Mutex combo

use std::sync::{Arc, Mutex};


async fn call_sb_dispatcher_async<T>(
    req: SbConnect,
    fut: T::Future,
    inner: Rc<RefCell<SbDispatcherInner>>,
    tx: Arc<Mutex<pool::Sender<Result<ReceiverLink, GwErrorReport>>>>,
    enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
    T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
    let path = req.path.clone();

    // Now that we have tx is wrapped in Arc<Mutex<>>, we can then lock and clone for each async use
    let poll_canceled_future = {
        let tx_clone = tx.clone();
        Box::pin(poll_fn(move |cx| {
            let mut sender = tx_clone.lock().unwrap();
            sender.poll_canceled(cx)
        }))
    };

    let create_link_future = Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs));
    let result = select(create_link_future, poll_canceled_future).await;

    match result {
        Either::Left((res, _)) => {
            if let Ok(mut sender) = tx.lock() {
                let _ = sender.send(
                    res.map(|link| {
                        trace!("SB receiver link is opened: {:?}", path);
                        link
                    })
                    .map_err(|e| {
                        trace!("SB opening receiver link is failed with {:?}", e);
                        e
                    }),
                );
            }
        },
        Either::Right(_) => {}
    };

    Ok(())
}
    |  _________________________^
195 | |                     res.map(|link| {
196 | |                         trace!("SB receiver link is opened: {:?}", path);
197 | |                         link
...   |
202 | |                     }),
203 | |                 );
    | |_________________^ move occurs because value has type `ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>`, which does not implement the `Copy` trait

Ah, this is becoming very frustrating, I think we probably need to somehow push a reference out instead of unwrapping after lock but not sure.

Thanks. I am single-threaded though, can we do without Arc?

You can use Rc<RefCell<T>> in place of Arc<Mutex<T>> in code that is known to the compiler to be single-threaded (i.e. does not have to meet any Send or Sync bounds).

By the way, it would be helpful to include the complete output when you post a compiler error.

1 Like

I see. Rc<RefCell> is pretty much the same challenge.

async fn call_sb_dispatcher_async<T>(
    req: SbConnect,
    fut: T::Future,
    inner: Rc<RefCell<SbDispatcherInner>>,
    tx: pool::Sender<Result<ReceiverLink, GwErrorReport>>,
    enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
    T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
    let path = req.path.clone();
    let tx = Rc::new(RefCell::new(tx));

    let left = create_sb_link_async::<T>(req, fut, inner, enable_cbs);
    let right = poll_fn(|cx| tx.borrow().poll_canceled(cx));

    pin_mut!(left);
    pin_mut!(right);

    let result = select(left, right).await;

    match result {
        Either::Left((res, _)) => {
            match res {
                Ok(link) => {
                    trace!("SB receiver link is opened: {:?}", path);
                    let mut tx_guard = tx.borrow_mut();
                    let _ = tx_guard.send(Ok(link));
                }
                Err(e) => {
                    trace!("SB opening receiver link failed with {:?}", e);
                    let mut tx_guard = tx.borrow_mut();
                    let _ = tx_guard.send(Err(e));
                }
            };
        }
        Either::Right(_) => {}
    };

    Ok(())
}

error[E0507]: cannot move out of dereference of `std::cell::RefMut<'_, ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>>`
   --> gw-core/src/servicebus/dispatcher.rs:192:29
    |
192 |                     let _ = tx_guard.send(Ok(link));
    |                             ^^^^^^^^^^^^^^^^^^^^^^^ move occurs because value has type `ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>`, which does not implement the `Copy` trait

Instead of tx.borrow_mut(), perhaps you can use Rc::get_mut(&mut tx).unwrap().get_mut(). This relies on the Rc no longer being shared when this code is reached (or else the unwrap will panic). Iā€˜m not certain whether this is the case with the pinned left/right futures as written, or if you need to wrap them in a block scope or something so they are dropped in time.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.