Is tokio codec Framed cancel safe?

Hi,

Tokio codec Framed docs does not talk about cancel safety. I can 'guess' that FramedRead and FramedWrite both are cancel safe since it's basically a buffer on top of the underlying AsyncRead and AsyncWrite and both of which are cancel safe, FramedRead/Write should be cancel safe I guess ?

That is, if I call FramedWrite on an item and it's cancelled, I can assume that the item was not written ? I was hoping to get an 'official' answer to this.

Rgds,
Gopa.

It doesn't look like it. Once the internal buffer of the FramedWrite is full, it will attempt to flush it, even if the frame isn't finished yet. In addition it won't send a frame until the internal buffer is filled or the end of the stream is signaled.

Thx for the response @bjorn3, that would be a bummer, I would imagine it is not that hard to implement cancel safety in the codec.

From what I see in the code, the start_send implementation for Sink just calls encode which just grows the buffer by as much as required to write an item, so the encode is always successful, there is no 'cancellation point' in start send. The flush/poll_ready is what tries to do the asyncWrite and that can write partial bytes (i.e not entire frame) yes, but that should be ok as long as the asyncWrite guarantees that cancel means no bytes written (which is does) ?

Can you pls point me a link (GitHub?) to where in the code you see a partial flush of the buffers etc.. that you are referring to being an issue so I can get a head start into the exact area in the code that is not cancel safe ?

Rgds,
Gopa.

It seems weird to want to talk about cancellation safety for something that is not a Future and usable in a tokio::select! branch. AsyncWrite is not cancel safe, some functions of AsyncWriteExt returning a Future are.

Cancellation safety is not really about not having done anything if cancelled, but that you are able to continue as if this Future has never been cancelled. That's why AsyncWriteExt::write_all is not cancel safe. You have no way to determine how many bytes were written when canceled, and the same call afterwards (with same buffer as input) may resend some bytes. But you can make a struct that will keep a reference to that buffer and an offset of what is already sent using AsyncWriteExt::write successively. This struct may provide a function similar to write_all (but without the buffer as argument) which will complete when all is written but will actually only starts to where it left off. This function would be cancel safe.

I do not see a way to write a future sending a whole frame which can ensure nothing is sent when canceled. But you shouldn't look that way. Even the cancel unsafety is not a problem, because this select! would stop using that FramedWrite if the future is cancelled, or you can make a cancel safe function that can progress on the sending of the same frame while having been canceled.

Assuming you mean "the send method from SinkExt", yes. Similar to how all streams have a cancel-safe next method, all sinks have a cancel-safe send method. (with cancel safe having exactly the meaning that you mentioned here)

Other methods on SinkExt may or may not be cancel safe, but generally, they're cancel safe whenever they send at most a single message.

I'm a maintainer of Tokio and tokio-util.

I am trying to figure out how something based on write syscall (so I am talking about any kind of helper above AsyncWrite) may guarantee that a full set of n bytes is either fully sent or no bytes is sent at all.

If a call to write returns that m (m < n ) bytes were written, how can you make sure no other branches of select! will complete and cancel this write ? It seems unlikely possible to me but I am interested to know the method if that is the case for SinkExt::send.

This simple code make sure to actually make exactly one poll of SinkExt::send by using the biased mode of tokio::select! and a directly completed future in the second branch. By using a large enough buffer, we end up by sending a partial part of the whole buffer we want to send.

use futures::SinkExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await?;

    let mut buffer = Vec::new();
    buffer.resize(10000000, b'H');

    let mut stream = tokio_util::codec::FramedWrite::new(stream, tokio_util::codec::BytesCodec::new());
    let buffer = bytes::Bytes::copy_from_slice(&buffer);

    tokio::select! {
        biased;
        _ = stream.send(buffer) => {
            println!("write_all completed");
        },
        _ = futures::future::ok::<(),  Box<dyn std::error::Error>>(()) => {
            println!("other branch completed");
        }
    }

    Ok(())
}

Sure, let me elaborate. The send call on FramedWrite has the following guarantee:

If send is used as the branch in a select! and some other branch completes first, then the message passed to send has not been appended to the buffer inside the FramedWrite.

Once its written to the buffer, its considered "sent", even though the actual write syscall happens later during a future call to flush or send.

To elaborate, when send doesn't complete immediately, its because it is busy flushing things already written to the buffer. Now, the send operation remembers how much has been sent from call to call, so even if you cancel the call to send and then make another one later, it will just continue writing from the spot it got to during the previous call. This works using the same mechanism as write_all_buf.

I don't really get your point because SinkExt::send documentation says that it complete after flushing the item into the sink and my example confirm this.

The select! complete in the second branch while we actually sent some bytes already.

Note that I am not saying that this is not cancel safe. I think it is cancel safe because, as your last answer implies, you can continue sending your frames without issues even if a send is canceled.

I am stressing a point of confusion here where the OP wants to know if canceling that send would actually be guaranteed to have sent no bytes at all for that frame (not that the frame is guaranteed to be not completely sent) and it is not the case. I guess it is a confusion about what "cancellation safety" mean here.

Oh, my, you're right. The sink trait shows its flaws yet again.

The method that has the guarantee I mentioned is feed. A call to send is like a call to feed followed by a call to flush. Replace all mentions of send with feed in my previous post.

1 Like

Ha ok, it is clear now. Thanks for the clarification :slight_smile:

Thanks for all the responses @ndusart @alice .. what I meant by cancel safety is that if I call send on an item, it's either accepted into the buffers or it's not as one unit - I was not really concerned about whether it's sent on wire or not. So looks like that is satisfied - the item is either totally in the buffers or it's not. Thanks again for confirming!

That is not, in fact, the guarantee that @alice made, which was:

That is, you are obligated to call it with the same data later, if you cancel (or close it, I suppose)

I can't see how it could work otherwise, given that your item could be larger than the buffer (or it could have an unbounded buffer, but that's generally a bad idea)

No, if a single item doesn't fit in the buffer, then the buffer is resized so that it fits. You are not obligated to call it with the same item later.

1 Like

Thanks for the correction, but now I don't understand your previous comment: how is this like the cancellation of write_all_buf then?

Edit: ah you meant it internally uses an equivalent to write_all_buf?

Yeah, we have to deal with two separate things:

  1. Serializing items into the buffer.
  2. Writing the buffer into the IO resource.

The first step happens in a cancel-safe manner so that the entire item is either fully serialized or not at all. The second step happens using a similar mechanism to write_all_buf.


A call to feed generally works in the following way:

if buffer_too_large {
    io_resource.write_all_buf(&mut buffer).await?;
}
buffer.extend(encode_to_bytes(next_item));

In particular, feed will not attempt to write the current item to the IO resource at all. That happens during a future call to feed or flush.

1 Like