@SNCPlay42 By looking at the source code of Shared
, a send
followed by twice recv
can result in panic?
pub fn send(&self, t: T) -> Result<(), T> {
// See Port::drop for what's going on
if self.port_dropped.load(Ordering::SeqCst) {
return Err(t);
}
if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
return Err(t);
}
self.queue.push(t);
match self.cnt.fetch_add(1, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
n if n < DISCONNECTED + FUDGE => {
// see the comment in 'try' for a shared channel for why this
// window of "not disconnected" is ok.
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
loop {
// drain the queue, for info on the thread yield see the
// discussion in try_recv
loop {
match self.queue.pop() {
mpsc::Data(..) => {}
mpsc::Empty => break,
mpsc::Inconsistent => thread::yield_now(),
}
}
// maybe we're done, if we're not the last ones
// here, then we need to go try again.
if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
break;
}
}
// At this point, there may still be data on the queue,
// but only if the count hasn't been incremented and
// some other sender hasn't finished pushing data just
// yet. That sender in question will drain its own data.
}
}
// Can't make any assumptions about this case like in the SPSC case.
_ => {}
}
Ok(())
}
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
// This code is essentially the exact same as that found in the stream
// case (see stream.rs)
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
// Essentially the exact same thing as the stream decrement function.
// Returns true if blocking should proceed.
fn decrement(&self, token: SignalToken) -> StartResult {
unsafe {
assert_eq!(
self.to_wake.load(Ordering::SeqCst),
EMPTY,
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
);
let ptr = token.to_raw();
self.to_wake.store(ptr, Ordering::SeqCst);
let steals = ptr::replace(self.steals.get(), 0);
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
assert!(n >= 0);
if n - steals <= 0 {
return Installed;
}
}
}
self.to_wake.store(EMPTY, Ordering::SeqCst);
drop(SignalToken::from_raw(ptr));
Abort
}
}
pub fn try_recv(&self) -> Result<T, Failure> {
let ret = match self.queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty => None,
mpsc::Inconsistent => {
let data;
loop {
thread::yield_now();
match self.queue.pop() {
mpsc::Data(t) => {
data = t;
break;
}
mpsc::Empty => panic!("inconsistent => empty"),
mpsc::Inconsistent => {}
}
}
Some(data)
}
};
match ret {
// See the discussion in the stream implementation for why we
// might decrement steals.
Some(data) => unsafe {
if *self.steals.get() > MAX_STEALS {
match self.cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, Ordering::SeqCst);
}
n => {
let m = cmp::min(n, *self.steals.get());
*self.steals.get() -= m;
self.bump(n - m);
}
}
assert!(*self.steals.get() >= 0);
}
*self.steals.get() += 1;
Ok(data)
},
// See the discussion in the stream implementation for why we try
// again.
None => {
match self.cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
_ => {
match self.queue.pop() {
mpsc::Data(t) => Ok(t),
mpsc::Empty => Err(Disconnected),
// with no senders, an inconsistency is impossible.
mpsc::Inconsistent => unreachable!(),
}
}
}
}
}
}
The call to send
invoke cnt.fetch_add(1)
such that cnt==1
, the first recv
, assuming invokes self.decrement
, the calling to decrement
will make:
steals == 0
self.steals == 0
self.cnt == 0
n = 1
n - steals <=0 is false
return Abort
*self.steals.get() -= 1; // is invoked
return data
the second recv
will do something like this:
// self.decrement is invoked
steals == -1
self.steals == 0
self.cnt == 0 - (1+-1) = 0
n = 0
n - steals == 1 <=0 is false
and this time
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
returns Err(Empty)
, however, rust/library/std/src/sync/mpsc/mod.rs at 34115d040b43d9a0dcc313c6282520a86d1e6f61 · rust-lang/rust · GitHub shows that Err(Empty)
will cause panic
pub fn recv(&self) -> Result<T, RecvError> {
loop {
let new_port = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(oneshot::Disconnected) => return Err(RecvError),
Err(oneshot::Upgraded(rx)) => rx,
Err(oneshot::Empty) => unreachable!(),
},
Flavor::Stream(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(stream::Disconnected) => return Err(RecvError),
Err(stream::Upgraded(rx)) => rx,
Err(stream::Empty) => unreachable!(),
},
Flavor::Shared(ref p) => match p.recv(None) {
Ok(t) => return Ok(t),
Err(shared::Disconnected) => return Err(RecvError),
Err(shared::Empty) => unreachable!(), // panic
},
Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
};
unsafe {
mem::swap(self.inner_mut(), new_port.inner_mut());
}
}
}
Did I miss something?