I would like to call std::mem::take on the value returned by tokio::sync:: watch::Receiver::borrow_and_update. In other words, I want to std::mem::take the last value that was put onto the watch. Is this possible somehow?
If this is possible, I will have to implement this using a Mutex, which can be taken from, and a Notify. I would like to avoid this if possible.
not in general, since borrow_and_update returns a shared reference, but mem::take() needs an exclusive reference (and the type must implement Default)
in theory, it might be possible, depending on what type T is, but even if it were, it would be not much difference from using a Mutex, there must be some synchronization somewhere after all.
but I feel like this may be an xy problem. maybe watch is not your best choice for the problem to begin with.
what's your actual use case? why you choose watch in the first place? is there alternatives? can you implement with regular channels? it would be great if the problem is described in more details.
I would like to have the equivalent of a watch, where the value is not only readable, but also modifiable (thus can call take on it).
In essence, one process will be writing values, and the other process will be notified each time the value is updated, with the possibility of modifying/taking that value.
One imaginary implementation could be a channel of size 1, where the value in the channel, if it exists, is overwritten every time a value is put on the channel. The reading process can take from the channel if there is a value in the channel. Otherwise it waits.
One easy implementation of this is to use a Notify and a Mutex. I am wondering whether an implementation already exists.
that makes little sense to me. if you want the receiver to take the value, then it is by no means "equivalent" to watch, since watch is designed for possibly multiple receivers to observe the latest value, hence the name watch.
if a receiver were allowed to update the value (owned by the channel) in-place, how would it affect other receivers, which might be concurrently borrowing the value at the same time? also, if it were allowed to update the value, was it a receiver or sender?
of course, any receiver could clone the "observed" value if it needed it and the type implements Clone, but that's all you can do.
it seems you can implement a "fan-in,fan-out" shaped channel for it.
for example, you can use a Mutex in the middle to store the latest value, all the senders share the same storage cell (e.g. Arc) and need to put value directly into it. the receivers also share the storage cell, they can also access the value directly, or update it as needed. as for the notification part, whatever compatible with tokio would do.
in fact, you don't really need to split the sender and receiver type, you just use different methods on the same type:
let the_value = Arc::new(MyFancySync::new(42));
// in a producer task:
the_value.put_and_notifiy(666);
// in a consumer task: listen for update events
let new_value = await the_value.get_next();
// another consumer, "take" value without notify listeners
let snapshot = the_value.swap(Default::default());
but Mutex prevents multple consumers from accessing the value concurrently, a variant of this design is to replace the Mutex with a upgradable RwLock, but I don't have experience with that.
this is what I just described.
and in fact, if you replace the Mutex with a RwLock (non-upgradable), then you essentially get the watch channel (minus some bookkeeping states).
not as far as I know. in fact, there are not that many hight performance concurrent data structures that support multiple consumers.
so I think the overhead of Mutex[1], is hardly avoidable for your use case, EXCEPT in special cases where the type is small enough so you can use atomic instructions to read and write it.
or an upgradable RwLock, which typically has more overhead than a non-upgradable one, which, compared to Mutex, typically makes trade-off of slightly worse write overhead for better read performance ↩︎