let processor = reader
.and_then(|frame| frame.and_then(|id| Worker(id, 5).select(Worker(id - 1, 3))
.map_err(|(err1, _err2) err1)
.map(|(winner, _outsider)| format!("Worker with id {} wins", winner.0))
.forward(writer)
.and_then( ... )
Back to the error you have. Sink::send() consumes self and returns a future that signals completion of the send operation. The key bit is it consumes self. for_each invokes the provided closure for each element, and as such it expects an FnMut closure - one that can be called multiple times (with mutation allowed, but that's not interesting here). Since the closure you wrote consumes writer, it's an FnOnce - can be called only once, and so you get the compiler error.
The approach I suggested above re-arranges the problem by creating a pipeline that funnels into Stream::forward(Sink). The idea is you set up a Stream chain that ends up producing a value that the Sink accepts (a String in your case). This Stream chain is then wired up with the Sink using forward - this combinator, as the name implies, sends all values produced by the Stream to the Sink. This avoids consuming the Sink at an individual element level.
You may need to tweak the snippet I gave above (to align the Error type, for example), but that's the gist of how to typically address these cases.