async fn call_sb_dispatcher_async<T>(
req: SbConnect,
fut: T::Future,
inner: Rc<RefCell<SbDispatcherInner>>,
tx: &mut pool::Sender<Result<ReceiverLink, GwErrorReport>>,
enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
let path = req.path.clone();
let result = select(Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs)), Box::pin(poll_fn(|cx| tx.poll_canceled(cx)))).await;
let _ = match result {
Either::Left((res, _)) => {
let _ = tx.send(
res
.map(|link| {
trace!("SB receiver link is opened: {:?}", path);
link
})
.map_err(|e| {
trace!("SB opening receiver link is failed with {:?}", e);
e
}),
);
},
Either::Right(_) => {}
};
Ok(())
}
I have an issue with tx object that does not implement Clone nor Copy, and I want to use it in a closure poll_fn as well as later outside closure when calling into tx.send.
I am getting an error cannot move out of *tx which is behind a mutable reference and cannot figure out since I'm trying to pass reference into a closure. Any idea how to fix this?
I've tried that approach as well, the same error will appear if I just wrap it in Arc<Mutex<_>> leaving everything else unchanged. Is there anything else I might need to add?
Then you can .clone() the Arc as many times as you need to create multiple independent pointers to the same Sender, and move a different one into each place that needs to own one.
// Use tx only within the closure or outside but not both
let tx_ref = &mut *tx; // explicitly borrow to clarify scope
let poll_canceled = Box::pin(poll_fn(move |cx| tx_ref.poll_canceled(cx)));
let create_link = Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs));
error[E0505]: cannot move out of `*tx` because it is borrowed
--> gw-core/src/servicebus/dispatcher.rs:193:21
|
179 | let tx_ref = &mut *tx; // explicitly borrow to clarify scope
| -------- borrow of `*tx` occurs here
...
193 | let _ = tx.send(
| _____________________^
194 | | res
195 | | .map(|link| {
196 | | trace!("SB receiver link is opened: {:?}", path);
... |
202 | | }),
203 | | );
| |_____________^ move out of `*tx` occurs here
...
209 | }
| - borrow might be used here, when `right` is dropped and runs the destructor for type `Pin<Box<impl futures_util::Future<Output = ()>>>
use std::sync::{Arc, Mutex};
async fn call_sb_dispatcher_async<T>(
req: SbConnect,
fut: T::Future,
inner: Rc<RefCell<SbDispatcherInner>>,
tx: Arc<Mutex<pool::Sender<Result<ReceiverLink, GwErrorReport>>>>,
enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
let path = req.path.clone();
// Now that we have tx is wrapped in Arc<Mutex<>>, we can then lock and clone for each async use
let poll_canceled_future = {
let tx_clone = tx.clone();
Box::pin(poll_fn(move |cx| {
let mut sender = tx_clone.lock().unwrap();
sender.poll_canceled(cx)
}))
};
let create_link_future = Box::pin(create_sb_link_async::<T>(req, fut, inner, enable_cbs));
let result = select(create_link_future, poll_canceled_future).await;
match result {
Either::Left((res, _)) => {
if let Ok(mut sender) = tx.lock() {
let _ = sender.send(
res.map(|link| {
trace!("SB receiver link is opened: {:?}", path);
link
})
.map_err(|e| {
trace!("SB opening receiver link is failed with {:?}", e);
e
}),
);
}
},
Either::Right(_) => {}
};
Ok(())
}
| _________________________^
195 | | res.map(|link| {
196 | | trace!("SB receiver link is opened: {:?}", path);
197 | | link
... |
202 | | }),
203 | | );
| |_________________^ move occurs because value has type `ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>`, which does not implement the `Copy` trait
Ah, this is becoming very frustrating, I think we probably need to somehow push a reference out instead of unwrapping after lock but not sure.
You can use Rc<RefCell<T>> in place of Arc<Mutex<T>> in code that is known to the compiler to be single-threaded (i.e. does not have to meet any Send or Sync bounds).
By the way, it would be helpful to include the complete output when you post a compiler error.
I see. Rc<RefCell> is pretty much the same challenge.
async fn call_sb_dispatcher_async<T>(
req: SbConnect,
fut: T::Future,
inner: Rc<RefCell<SbDispatcherInner>>,
tx: pool::Sender<Result<ReceiverLink, GwErrorReport>>,
enable_cbs: bool,
) -> Result<(), GwErrorReport>
where
T: Service<SbConnect, Response = client::Client, Error = client::ConnectError>,
{
let path = req.path.clone();
let tx = Rc::new(RefCell::new(tx));
let left = create_sb_link_async::<T>(req, fut, inner, enable_cbs);
let right = poll_fn(|cx| tx.borrow().poll_canceled(cx));
pin_mut!(left);
pin_mut!(right);
let result = select(left, right).await;
match result {
Either::Left((res, _)) => {
match res {
Ok(link) => {
trace!("SB receiver link is opened: {:?}", path);
let mut tx_guard = tx.borrow_mut();
let _ = tx_guard.send(Ok(link));
}
Err(e) => {
trace!("SB opening receiver link failed with {:?}", e);
let mut tx_guard = tx.borrow_mut();
let _ = tx_guard.send(Err(e));
}
};
}
Either::Right(_) => {}
};
Ok(())
}
error[E0507]: cannot move out of dereference of `std::cell::RefMut<'_, ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>>`
--> gw-core/src/servicebus/dispatcher.rs:192:29
|
192 | let _ = tx_guard.send(Ok(link));
| ^^^^^^^^^^^^^^^^^^^^^^^ move occurs because value has type `ntex::channel::pool::Sender<Result<ReceiverLink, ioterror::GwErrorReport>>`, which does not implement the `Copy` trait
Instead of tx.borrow_mut(), perhaps you can use Rc::get_mut(&mut tx).unwrap().get_mut(). This relies on the Rc no longer being shared when this code is reached (or else the unwrap will panic). Iām not certain whether this is the case with the pinned left/right futures as written, or if you need to wrap them in a block scope or something so they are dropped in time.