I need to create a loop with futures-rs. I've broken my head trying to do it.
I need to write smth like this:
fn write_loop(write: TaskIoWrite<TcpStream>, queue: Receiver<Event>) -> BoxFuture<()> {
queue.and_then(|event| {
let bytes = serialize_event(event);
futures_io::copy(bytes, write) // doesn't work
}).fold(..).map(..).boxed()
}
But I cannot do it, because write
cannot be moved into and_then
callback.
I've found a solution, and I don't like it.
I can store write
in TaskData<RefCell<Option<..>>>
, and then wrap it in iterator:
let writes = stream::iter(repeat(TaskData::new(RefCell::new(Some(write))));
and after that do somthing like that:
queue.zip(writes).and_then(|(event, write_task_data)| {
let write = write_task_data.with(|d| d.borrow_mut().take().unwrap()));
let bytes = serialize_event(event);
futures_io::copy(bytes, write).map(|write| {
// store `write` back in TaskData so it could be taken in another iterator
write_task_data.with(|d| *d.borrow_mut() = Some(write))
()
})
});
This is somewhat weird.
Is there any simpler way to implement this loop with futures-rs?
cc @alexcrichton
Yeah loops currently are best done with various combinators on streams (e.g. fold
). With fold
you can persist state needed on each iteration of the loop by inserting it into the future itself, like:
fn write_loop(write: TaskIoWrite<TcpStream>, queue: Receiver<Event>) -> BoxFuture<()> {
queue.map(serialize_event)
.fold(write, |event| write_all(write, event).map(|(w, _)| w))
.map(...)
.boxed()
}
perhaps?
We're still working a bit on the specifics of the TaskData
story, but you could also just store TaskData<RefCell<W>>
and currently clone
that wherever it needs to be reference and borrow it when you need to.
Yes, thanks! That's exactly what I need. I don't know how I overlooked fold
.
BTW, what's the proper best place to get help with futures-rs? Because I struggle with it now.
For example, now I have next problem:
TaskData.with(..)
(for some reason I also don't understand) does not provide mutable access to data.
I tried to wrap data as TaskData<RefCell<T>>
. And after that my TaskData
is no longer Sync
. And because of that, I cannot convert my stream to boxed()
, because boxed
requires self to be Sync
.
The obvious solution is to wrap data in Mutex
(TaskData<Mutex<T>>
), but I think something better is possible?
Either here, IRC, or an issue on the repo is good for now. Just be sure to cc me so I see an yquestions!
does not provide mutable access to data
Ah this is basically the same story as thread-local data. For example you can do:
let a = TaskData::new(..);
let b = a.clone();
a.with(|a| {
b.with(|b| {
// a and b alias
})
})
And after that my TaskData is no longer Sync
Oh I think that's a bug, TaskData<T>
should be Sync
regardless of the underlying type I believe.