Must be possible to iterate over elements in queue. Actually, it isn't really important for the application to be able to iterate over the elements, just to get a "snapshot" of the current queue state, in order to store the queue elements to persistent storage, without affecting the contents of the queue.
Must be able to send/receive across thread/task (read: non-async to/from async)
spsc is ok
Once an element has been received, it must be automatically re-added to the channel again once the element is dropped -- unless it has been marked as "done":
let n = rx.recv().unwrap;
drop(n); // returns n to the queue/channel, and will be next to be received
let n = rx.recv().unwrap;
n.finished(); // drops, without returning to channel/queue
When you drop something it is no longer usable, so you can't add it back to a channel or do anything else with it. Or do you mean it's not really dropped?
Also, I don't know of any channels that allow you to insert things at the front of the queue.
If you don't want to really drop it, perhaps you could save it in a Option var that is a local var of the function iterating over the channel. At the top of the loop, take it out of the option var and process it before you ask the channel for the next queued entry.
This sounds like more of an API convenience thing, except that the queue must effectively support putting back an element.
Now, you should probably specify the precise kind of API you’re looking for; since obviously this sort of “guard object” for auto placing back an element needs to reference the queue, are you looking for something with a lifetime that keeps a borrow of rx? Like impl Receiver<T> { fn recv(&mut self) -> Result<AutoPutBackGuard<'_, T>> }? Or do you want shared ownership of sorts? (The latter option poses quite a few follow-up questions though…[1])
Another question of interaction with “spsc”: Does this mean, the “iterate over elements”/“get snapshot” functionality works through either of the sender/receiver, or are you only interested in support for doing this, e.g., only through the sender or only the receiver? Also, is the queue ever getting so large that it’s important the sender not be be blocked during such iteration/snapshot-taking?
The actual use-case is: We have a queue of messages to be sent to a connected client over network. The connection task takes a message off the queue, and sends it to the client. The client sends an ack, and then the message can be dropped (as in released), and then the connection handler task can process the next message on the queue. If an error occurs, or the client sends a nack, the message needs to be put back onto the queue again, so that it can be delivered once the client has reconnected. (The connection is dropped on errors or nack's).
I.e. if an ack was received from the client I want to be able to mark the message as "completed", in which case it will not be put back onto the queue. In all other cases, dropping the message should put it back on the queue.
We have a solution that is way too complex; we've already encountered a bug in it. I want it to be more robust, and I realized in this case if I can just return from the function if it fails it would be very robust -- but that would require messages to not get lost on return.
I have an implementation idea in mind which does not use lifetimes (I simply have an Arc<Shared<T>>, same as the channel end-points) in the guard). However, the reason I'm asking is that I don't want to maintain Yet Another Crate, and am hoping that this thing already exists. If there is a solution that can do what I need that binds the element to the receiver's lifetime then that would not cause a problem for this application.
In a sense, I'm looking for a channel-like object, but which can be accessed as a VecDeque on shutdown: I described the basic application flow with regards to its channel's in the reply to @jumpnbrownweasel, but there's another twist: When the application is shut down, all the client connections are shut down, meaning no more nodes will be taken off the queue. But I still have the transmitting end-point. I need to be able to, via the Sender, save all the messages in the queue to a database (so they can be loaded when the application is restarted).
As mentioned above; the shutdown process guarantees that all the clients are disconnected, which drops the channel receiver end-points -- so only the sender remains -- and during shutdown no new nodes will be added to the channel/queue.
In a very hand-wavy way, a channel that is implemented using a VecDeque could have an Sender::into_inner() that returns the internal VecDeque, so that the application can iterate over the messages and save the to persistent storage. But that bit isn't super important; it might as well be a channel that supports running a closure over all elements currently in its queue or something.
In very simplified code, what I want to be able to do is this (startup):
let (tx, rx) = channel(4096); // maximum 4K messages in queue
// spawn thead that generates new messages
thread::spawn(|| generator(tx));
// spawn task that waits for a client to connect, then starts sending messages
// to the client
task::spawn(launch_listener(rx));
The transmitting end-point's life-cycle:
fn generator(tx: Sender<Msg>) {
// .. load messages from database and add them to the queue via tx.send() ..
loop {
// .. keep gathering data ..
// .. and send it as messages..
tx.send(msg);
if do_shutdown {
// at this point, it is guaranteed that the receiver end-point has been dropped
break;
}
}
// conceptually:
let msgs_to_store = tx.to_vec();
save_to_database(msgs_to_store);
}
The receiver end-point's life-cycle:
async fn wait_for_client(rx: Receiver<Msg>) {
loop {
// .. do all the TcpListener things ..
// When a client finally connects:
conn_handler(conn, &rx).await;
}
}
async fn conn_handler(mut conn: TcpStream, rx: &Receiver<Msg>) {
loop {
// In reality there's a select and a cancel token here as well ..
let msg = rx.recv().await;
if let Err(_) = send_msg_to_client(&mut conn, &msg).await {
// msg will be dropped here, and put back onto the queue
return;
}
let reply = recv_reply_from_client(&mut conn).await;
if let Reply::Ack = reply {
msg.finalize(); // dropping msg will _not_ return it to queue
} else {
// msg will be dropped here, and put back into the queue
return;
}
}
}
Hm.. I hadn't outlined it like this before, and it made me realize how gross our current implementation is.
Basically I want a "regular" bounded channel that can talk over tasks/treads, but with the ability to return elements to the channel that are not marked as "completed", and I also need a way to iterate over the elements in the channel given the Sender end-point.
The more I write about this, the more niche I realize that it is, so I'm feeling less and less optimistic about finding an already existing crate for this.
So, writing channels isn’t that complicated, and since it seems I’m already in the business of writing them, here’s a small example that should fit your requirements:
use std::collections::VecDeque;
use std::future::{poll_fn, Future};
use std::sync::{Arc, Mutex};
use std::task::{Poll, Waker};
use std::thread::{self, Thread};
#[derive(Debug)]
pub struct SimpleChannelClosed;
struct SimpleChannel<T> {
capacity: usize,
queue: VecDeque<T>,
waker: Option<Waker>,
thread: Option<Thread>,
closed: bool,
}
impl<T> SimpleChannel<T> {
fn new(capacity: usize) -> Self {
SimpleChannel {
capacity,
queue: VecDeque::new(),
waker: None,
thread: None,
closed: false,
}
}
fn wake_receiver(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
fn unpark_sender(&mut self) {
if let Some(thread) = self.thread.take() {
thread.unpark();
}
}
}
pub struct SimpleSender<T>(Arc<Mutex<SimpleChannel<T>>>);
impl<T> SimpleSender<T> {
pub fn send(&mut self, value: T) -> Result<(), SimpleChannelClosed> {
loop {
let mut channel = self.0.lock().unwrap();
if channel.closed {
return Err(SimpleChannelClosed);
} else if channel.queue.len() < channel.capacity {
channel.queue.push_back(value);
channel.wake_receiver();
return Ok(());
} else {
channel.thread = Some(thread::current());
drop(channel);
thread::park();
}
}
}
pub fn into_vec(self) -> Vec<T> {
let mut channel = self.0.lock().unwrap();
let result = channel.queue.drain(..).collect();
channel.thread = None;
channel.closed = true;
channel.wake_receiver();
result
}
}
impl<T> Drop for SimpleSender<T> {
fn drop(&mut self) {
let mut channel = self.0.lock().unwrap();
channel.thread = None;
channel.closed = true;
channel.wake_receiver();
}
}
pub struct SimpleReceiver<T>(Arc<Mutex<SimpleChannel<T>>>);
impl<T> SimpleReceiver<T> {
pub fn recv(&mut self) -> impl Future<Output = Result<T, SimpleChannelClosed>> + '_ {
poll_fn(|cx| {
let mut channel = self.0.lock().unwrap();
if let Some(value) = channel.queue.pop_front() {
channel.unpark_sender();
Poll::Ready(Ok(value))
} else if channel.closed {
Poll::Ready(Err(SimpleChannelClosed))
} else {
channel.waker = Some(cx.waker().clone());
Poll::Pending
}
})
}
pub fn put_back(&mut self, value: T) {
let mut channel = self.0.lock().unwrap();
channel.queue.push_front(value);
}
}
impl<T> Drop for SimpleReceiver<T> {
fn drop(&mut self) {
let mut channel = self.0.lock().unwrap();
channel.closed = true;
channel.unpark_sender();
}
}
pub fn simple_channel<T>(capacity: usize) -> (SimpleSender<T>, SimpleReceiver<T>) {
let channel = Arc::new(Mutex::new(SimpleChannel::new(capacity)));
let sender = SimpleSender(channel.clone());
let receiver = SimpleReceiver(channel);
(sender, receiver)
}
// -----------------------------
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = simple_channel(3);
let handle = std::thread::spawn(move || {
println!("sender started");
thread::sleep(std::time::Duration::from_millis(200));
for i in 0..10 {
tx.send(i).unwrap();
println!("sent: {}", i);
thread::sleep(std::time::Duration::from_millis(100));
}
// Simulate waiting for the shutdown signal
std::thread::sleep(std::time::Duration::from_secs(1));
// fetch the unprocessed messages
let msgs = tx.into_vec();
println!("sender stopped. Unprocessed messages: {msgs:?}");
});
println!("receiver started");
while let Ok(msg) = rx.recv().await {
println!("received: {}", msg);
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
if msg == 6 {
println!("putting back {msg} and stop receiving");
rx.put_back(msg);
break;
}
}
handle.join().unwrap();
}
As for putting items back into the queue, relying on Drop doesn’t seem worth the effort. A simple put_back() should do the trick.