Mutable borrow a list of elements in tuples

task::spawn( async move {
        let mut channels : ShortVector<(Option<&mut Transit>, BindPattern, Hash)> = ShortVector::with_capacity(tuples.len());
        let mut pairs : ShortVector<(Transit, oneshot::Sender<Transit>)> = ShortVector::with_capacity(tuples.len());

        for (hash, bind_pattern, rx, tx) in tuples {
            match rx.await {
                Err(e) => {
                    warn!("Error in oneshot::Receiver<Transit>. {} - {:?}", &e, &e);
                    return;
                },
                Ok(transit) => {
                    pairs.push((transit, tx));
                    channels.push((None, bind_pattern, hash));
                }
            }
        }

        for (idx, tuple) in channels.iter_mut().enumerate() {
            tuple.0 = Some(&mut pairs[idx].0);
        }

        // now handle it
        let wrappers = {
            let consuming_task = ConsumeTask::<ConsumingChannel> {
                replier : consume.replier,
                continuation : consume.continuation,
                persistent: consume.persistent,
                peek : consume.peek,
                channels : channels,
            };
            Transit::consume(consuming_task)
        };

        // now send the signals
        for (transit, sender) in pairs {
            if let Err(_) = sender.send(transit) {
                warn!("sender.send(transit) failed but it should not!");
            }
        }
    
});

The input tuples is a vector of tuple with type ( Hash, BindPatten, oneshot::Receiver<Transit>, oneshot::Sender<Transit> )

First await on oneshot::Receiver<Transit> to get the Transit

Then split tuples into two vectors of tuples.

  • pairs in type of Vec<( oneshot::Sender<Transit>, Transit)>
  • channels in type of Vec<(Option<&mut Transit>, Hash, BindPatten) >

Then call a non-async method Transit::consume with channels as parameter

The problem is that, I am not allowed to store a list of mutable borrows

cannot borrow pairs as mutable more than once at a time
pairs was mutably borrowed here in the previous iteration of the loop

do you have any idea how I can pass mutable borrows of Transit into Transit::consume()?

  1. I tried to change pairs from (Transit, oneshot::Sender<Transit>) to ( RefCell<Transit>, oneshot::Sender<Transit>). But it says RefCell cannot cross await
  2. Certainly I can move Transit into Transit::consume() and then return back. But I dislike this approach.
  3. Should I avoid using await and change to combinators from future-rs so that I can have RefCell across each receive?
  4. BTW, is FuturesOrdered better here for performance if only a few awaits?

Thank you in advance. Also I attached a screenshot so that the code is more readable.

The following is the attempt to use RefCell instead, and it fails to compile because Cell is not thread safe in case of await

future cannot be sent between threads safely
the trait Sync is not implemented for Cell<isize>

Try doing:

for (tuple, pairs_i) in channels.iter_mut().zip(&mut pairs) {
    tuple.0 = Some(&mut pairs_i.0);
}

The gist is that &mut borrows are actually exclusive borrows, on top of just mutable ones (per definition). And when performing indexing with "arbitrary indices" such as in your &mut pairs[idx] operation, the problem is that if, in two different iterations of that for loop, you stumbled upon a repeated idx value, then that would break the unicity / exclusivity contract of the &mut.

And if something may go wrong at runtime, Rust won't let that code compile for the sake of safety, unless you:

  • either promise Rust that you know it won't, that it should blindly trust you (i.e., use unsafe :warning: ),

  • or you come up with a different pattern whereby Rust knows the used indices will all be distinct to each other, which is what I have done: the .enumerate()-and-then-index pattern can be replaced by a .zip(), when the two iterators are of equal length (if, on the other hand, pairs happened to be shorter than channels, then an .enumerate()-and-then-index approach would lead to a panic (which is usually "good"; you want programmer errors to be visible), whereas a .zip() would silently truncate the iteration over channels to match pairs's length (this "silent" behavior is the main caveat of using .zip())).

    • Aside: That's why I wish std also featured both the .zip_eq_count() and .zip_ge_count() adaptors that would panic on length mismatch / truncation of the lhs iterator (or something fully-fledged such as ::itertools's .zip_longest()
  • (or you delegate to a runtime check, as you attempted to do with RefCell, although RefCell is not Sync which I guess was the reason for your second problem; should you need to do that in the future, the Sync equivalent of RefCell is RwLock, whereby .borrow_mut() becomes .try_write().unwrap()).

2 Likes

Thanks @Yandros , zip() works now, although it loops twice.

I don't want to involve synchronization primitives like RwLock.

If using future-rs combinators instead of await, would that make RefCell work?

It won't loop more than what you attempted with the .enumerate() indexing. Indeed,

for (tuple, pairs_i) in channels.iter_mut().zip(&mut pairs) {
    tuple.0 = Some(&mut pairs_i.0);
}

is equivalent to your solution but using unsafe to force the compiler's hand:

let pairs_len = pairs.len();
let pairs_ptr = pairs.as_mut_ptr();
for (idx, tuple) in channels.iter_mut().enumerate() {
    if idx >= pairs_len { break; } // You can hoist that check outside the loop with an `assert_eq!(pairs.len(), channels.len());`
    tuple.0 = Some(unsafe { &mut (*pairs_ptr.add(idx)).0 });
}

You can use RefCell in your futures, even when using async/ .await, but it's just that if a &RefCell reference crosses an .await point, then the obtained Future won't be Send, which is very restrictive in practice (only "current thread" executors will be able to drive it to completion).

Most of RwLock synchronization happens if you use blocking methods, such as .write(). That's why I suggested using .try_write() instead: at that point the only overhead you'll have to pay is for the read & writes happening on the guard flag being atomic / multi-thread resiliant rather than non-atomic / thread-unsafe.

  • FWIW, another option, depending on the use case, is to use async-friendly locks, since those don't really "block", they just cause the future to yield execution when busy (thus cooperating and letting other tasks run).
1 Like

It won't loop more than what you attempted with the .enumerate() indexing. Indeed,

I didnt make myself clearly enough.

The first loop is for (hash, bind_pattern, rx, tx) in tuples
The second loop is for (pair, chan) in (&mut pairs).iter_mut().zip(&mut channels)

In fact the second loop is redundant if its work can be done in the first loop.

You can use RefCell in your futures, even when using async / .await , but it's just that if a &RefCell reference crosses an .await point

Yep, that is the reason why it cannot pass through .await. But in fact the code still run in sequence even if there could be multiple threads execute them one after another.

Thanks for the suggestion, I will try to check if there is anything AsyncCell to avoid the second loop.

Indeed! That's a common "issue" that boils down to:

Basically you'd like to take references to the last element pushed while also extending the Vec. This pattern, in general can easily lead to dangling pointers (a form of iterator(-caused) invalidation), since when you ,push() onto a Vec, it may reallocate, causing the previous references to now be stale / dangling.

Granted, in your case, you do preallocate in advance, so that you know such a realloc won't happen; sadly, the types and functions used do not let Rust / the type-system know it. So, unless you resort to unsafe, you'll need that second iteration.

FWIW, here is what a solution using unsafe would look like (note: going the unsafe route does more harm than good, unless you benchmark and do observe a significant performance difference between the two approaches: indeed, if done wrong you'll have UB, which means that program could feature bugs at any point, and bugs which may not be caught by CI. It also makes refactoring the code / keeping it up to date that much harder):

use ::smallvec::SmallVec as ShortVec; // I assume that's where your `ShortVec` comes from.

/// Filler of a missing piece of `SmallVec`'s API:
fn small_vec_with_fixed_cap_pushes<'vec, T : 'vec> (
    vec: &'vec mut ShortVec<T>,
) -> impl 'vec + FnMut(T) -> Result<&'vec mut T, OutOfCapacityError<T>>
{
    use ::core::mem::MaybeUninit;

    let cap = vec.capacity();
    let len = vec.len();
    assert_eq!(len, 0, "For the sake of the example");
    let raw_buf: &'vec mut [MaybeUninit<T>] = unsafe {
        ::core::slice::from_raw_parts_mut(
            vec.as_mut_ptr().cast(),
            cap,
        )
    };
    let mut len = ::scopeguard::guard(len, /* on drop */ move |it| unsafe { vec.set_len(it) });
    move |elem| {
        let i = *len;
        if let Some(slot) = raw_buf.get_mut(i) {
            *len += 1;
            Ok(slot.write(elem))
        } else {
            Err(OutOfCapacityError(elem))
        }
    }
}
// where
struct OutOfCapacityError<T>(pub T);
  • Stacked Borrows technicality

    The above relies on vec.set_len(…) not to invalidate the provenance of the ptr from .as_mut_ptr().

so as to be able to write:

let mut pairs = ShortVec::with_capacity(tuples.len());
let mut push_pair = small_vec_with_fixed_cap_pushes(&mut pairs);
for … in tuples {
    …
    … => {
        // push a new pair
        let at_last_pair: &mut Pair =
            push_pair((transit, tx))
                .unwrap_or_else(|_| unreachable!())
        ;
        channels.push((&mut at_last_pair.0, bind_pattern, hash));
    },
}
drop(push_pair);
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.