How to create a loop with futures-rs?


#1

I need to create a loop with futures-rs. I’ve broken my head trying to do it.

I need to write smth like this:

fn write_loop(write: TaskIoWrite<TcpStream>, queue: Receiver<Event>) -> BoxFuture<()> {
    queue.and_then(|event| {
        let bytes = serialize_event(event);
        futures_io::copy(bytes, write) // doesn't work
    }).fold(..).map(..).boxed()
}

But I cannot do it, because write cannot be moved into and_then callback.

I’ve found a solution, and I don’t like it.

I can store write in TaskData<RefCell<Option<..>>>, and then wrap it in iterator:

let writes = stream::iter(repeat(TaskData::new(RefCell::new(Some(write))));

and after that do somthing like that:

queue.zip(writes).and_then(|(event, write_task_data)| {
    let write = write_task_data.with(|d| d.borrow_mut().take().unwrap()));
    let bytes = serialize_event(event);
    futures_io::copy(bytes, write).map(|write| {
        // store `write` back in TaskData so it could be taken in another iterator
        write_task_data.with(|d| *d.borrow_mut() = Some(write))
        ()
    })
});

This is somewhat weird.

Is there any simpler way to implement this loop with futures-rs?

cc @alexcrichton


#2

Yeah loops currently are best done with various combinators on streams (e.g. fold). With fold you can persist state needed on each iteration of the loop by inserting it into the future itself, like:

fn write_loop(write: TaskIoWrite<TcpStream>, queue: Receiver<Event>) -> BoxFuture<()> {
    queue.map(serialize_event)
         .fold(write, |event| write_all(write, event).map(|(w, _)| w))
         .map(...)
         .boxed()
}

perhaps?

We’re still working a bit on the specifics of the TaskData story, but you could also just store TaskData<RefCell<W>> and currently clone that wherever it needs to be reference and borrow it when you need to.


#3

Yes, thanks! That’s exactly what I need. I don’t know how I overlooked fold.

BTW, what’s the proper best place to get help with futures-rs? Because I struggle with it now.

For example, now I have next problem:

TaskData.with(..) (for some reason I also don’t understand) does not provide mutable access to data.

I tried to wrap data as TaskData<RefCell<T>>. And after that my TaskData is no longer Sync. And because of that, I cannot convert my stream to boxed(), because boxed requires self to be Sync.

The obvious solution is to wrap data in Mutex (TaskData<Mutex<T>>), but I think something better is possible?


#4

Either here, IRC, or an issue on the repo is good for now. Just be sure to cc me so I see an yquestions!

does not provide mutable access to data

Ah this is basically the same story as thread-local data. For example you can do:

let a = TaskData::new(..);
let b = a.clone();

a.with(|a| {
    b.with(|b| {
        // a and b alias
    })
})

And after that my TaskData is no longer Sync

Oh I think that’s a bug, TaskData<T> should be Sync regardless of the underlying type I believe.