Use an external process as an iterator


#1

I am trying to setup a chain of iterators and to be able to insert a running process in that chain.
So the process will take a Read for stdin and produce an Iterator of stdout.
Note that I can convert between Read and Iterator (itertools has the Iterator -> Read conversion).

I am stuck when trying to add proper error handling and could use some suggestions.

The only way I know how to stream a process is to return a BufReader of stdout and to spawn a thread to feed stdin to the proecess and also to view stderr as needed.
This all works, but problems arise when trying to detect errors. I also need to wait for the command to finish to inspect the error code. The problem is that I don’t think the Child interface is amenable to this: calling process.wait() on another thread ends up with borrowing violations.

The code is on github here:


Uncommenting the code in question gives borrowing errors.

error[E0382]: capture of partially moved value: `process`
  --> src/main.rs:78:30
   |
66 |         let stdout = process.stdout.expect("impossible! no stdout");
   |                      -------------- value moved here
...
78 |                 let result = process.wait()
   |                              ^^^^^^^ value captured here after move

Note that I have not yet defined how to send this error back to the caller, It should be possible with a channel or a global panic handler.

A perfectly fine solution for me would be to point me to an alternative crate that implements this idea. I have not yet been able to come across it.


#2

The process can be used from one thread only, and your code moves the process to the newly spawned thread, so from then on it lives in that thread only. process.stdout and process.stderr are still part of the same object, so Rust complains that you’re trying to use part of it in one thread, and rest of it in the other thread.

I’m not sure how to solve that though. Usually you’d use Arc and Mutex to share an object between threads, but process.wait() would need to hold the lock for the entire duration. It looks like a problem with the API of Command that makes it inherently single-threaded.

So wait_with_output seems like the only viable option if you want both stdout and stderr.
If the command has no stderr you could block on reading stdout before calling .wait() (wait guarantees it’ll work for stdout, but doesn’t guarantee it for stderr, so blocking on reading stderr could deadlock the process if it waits to flush stdout).


#3

Thanks, that helps to confirm my problem and that I am not missing something simple.
I need to stream output (and input), which wait_with_output does not do.


#4

Oh, maybe there is a way! There’s option.take() which could be used to remove stdin/stderr from the process, so you could send them to separate threads.


#5

That fixes it, thank you!
This change is committed to master now.

Now I need to figure out the best way to propagate errors back to the caller that occur in these threads during streaming.


#6

Value returned by thread’s closure can be read from thread.join():

fn main() {
    let t = {
        std::thread::spawn(||{"hello"})
    };
    let result = t.join().unwrap();
    println!("{}", result);
}

#7

That will be helpful, but there is a more fundamental problem in that streaming starts immediately (I immediately return an iterator), and an error can occur at any time during the streaming and potentially even shortly after it (after is more of a weird edge case though).