Hi.
There is a thing called "packet mode" pipes in Linux, see pipe(2)
.
TL;TR: when opened with O_DIRECT
, each write is a packet (not larger than 4096 -- PIPE_BUF
).
Each read reads one "packet", if buffer is too small remain bytes are discarded.
Here is a small tool that runs dd(1)
in a "packet" mode.
use std::process::Stdio;
use tokio::io::AsyncReadExt;
use tokio::process::Command;
const READ_BLOCK_SIZE: usize = 65536;
const BYTES_TO_WRITE: usize = 65536 * 2;
#[tokio::main]
async fn main() {
let process = Command::new("/bin/dd")
.arg("if=/dev/zero")
// important: sets `fcntl` F_SETFL O_DIRECT
// enables so-called "packet mode", see `pipe(2)` `O_DIRECT` option
.arg("oflag=direct")
.arg(format!("bs={}", BYTES_TO_WRITE))
.arg("count=1")
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdout = process.stdout.unwrap();
let mut buffer = [0u8; READ_BLOCK_SIZE];
let mut bytes_read = 0;
loop {
let i = stdout.read(&mut buffer).await.unwrap();
println!("I read {}", i);
bytes_read += i;
if i == 0 {
break;
}
}
if bytes_read != BYTES_TO_WRITE {
panic!("Wrong number of bytes read: {bytes_read}");
}
}
...and it gets stuck. Here is a strace
:
// dd enables packet mode
[pid 20030] fcntl(1, F_SETFL, O_WRONLY|O_DIRECT) = 0
// reads and writes zeros
[pid 20030] read(0, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 131072) = 131072
[pid 20030] write(1, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 131072 <unfinished ...>
// futex awakes
[pid 20017] <... epoll_wait resumed>[{events=EPOLLIN, data={u32=3533706496, u64=94346785330432}}], 1024, -1) = 1
[pid 20017] futex(0x55ced29ecd70, FUTEX_WAKE_PRIVATE, 1) = 1
[pid 20013] <... futex resumed>) = 0
[pid 20017] epoll_wait(3, <unfinished ...>
// Tokio ties to read 64K, but reads only 4K (due to packet mode)
[pid 20013] read(9, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 65536) = 4096
[pid 20013] write(1, "I read 4096\n", 12I read 4096
) = 12
[pid 20013] futex(0x55ced29ecd70, FUTEX_WAIT_BITSET_PRIVATE, 1, NULL, FUTEX_BITSET_MATCH_ANY
// everything is frozen here forever
Now, let's try to use blocking api.
-use tokio::process::Command;
+use std::process::Command;
and remove await
from read
:
it works!!: it reads 4096 blocks till the end (just like pipe(2)
suggests).
Workaround: setting buffer size to 4096
helps. It seems that Tokio waits for more data (to fill the buffer) but no more than 4096 packet might come from the "packet" pipe.
So, the question is: should I create an issue? From my point of view, if something works in "blocking" mode it should do the same in async mode. But I doubt if having buffer larger than PIPE_BUF
is a valid thing for "packet mode".
thanks in advance.