Cannot split mutable borrows of struct fields in poll_flush implementation

I'm new to Rust and trying my first practice project.

I'm encountering a Rust compiler error (E0499) when trying to split mutable borrows of two different fields in a struct. The error occurs in my async poll_flush implementation where I need to simultaneously access self.pending_send and self.inner.

I don't quite understand why you can't mutibly borrow different fields at the same time. Some of the materials I searched claimed that different fields of the same structure could be borrowed at the same time, and some of them said they couldn't. Some data requires unsafe.

I'm not sure if it's a problem with my rust version or? I have upgraded to the latest version, the current version is 'rustc 1.85.0 (4d91de4e4 2025-02-17)'.

I tried 'p.remaining().to_vec()' to make a direct copy that worked, but it wasn't performance-friendly.

I'm feeling the difficulty of the rust lifecycle, and I wonder if I need to redesign the struct to implement this feature?


error[E0499]: cannot borrow `self` as mutable more than once at a time
   --> write_stream.rs:288:30
    |
287 |             let pending_send = &mut self.pending_send;
    |                                     ---- first mutable borrow occurs here
288 |             let inner = &mut self.inner;
    |                              ^^^^ second mutable borrow occurs here
...
292 |             let data = match pending_send {
    |                              ------------ first borrow later used here1


    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        loop {
            // let (pending_send, inner) = (&mut self.pending_send, &mut self.inner);
            let pending_send = &mut self.pending_send;
            let inner = &mut self.inner;


            // 第一阶段:获取待写入数据(短暂借用 pending_send)
            let data = match pending_send {
                Some(p) => p.remaining().to_vec(),
                None => break,
            };

            if data.is_empty() {
                break;
            }

            // 第二阶段:执行写入操作(仅借用 inner)
            let write_result = {
                // let inner = &mut self.inner;
                Pin::new(inner).poll_write(cx, data.as_ref())
            };

            // 第三阶段:处理写入结果(重新借用 pending_send)
            match write_result {
                Poll::Ready(Ok(n)) => {
                    if let Some(p) = pending_send {
                        if p.consume(n) == 0 {
                            //      self.pending_send = None;
                        }
                    }
                }
                Poll::Pending => return Poll::Pending,
                Poll::Ready(Err(e)) => {
                    self.send_has_error = true;
                    return Poll::Ready(Err(e));
                }
            }
        }
        Pin::new(&mut self.inner).poll_flush(cx)
}

完整代码:

use aes::Aes256;
use ctr::cipher::{KeyIvInit, StreamCipher};
use ctr::Ctr64BE;
use rand::RngCore;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use zeroize::Zeroize;

type Aes256Ctr = Ctr64BE<Aes256>;

#[derive(Debug, Default)]
struct PartialBuffer {
    data: Vec<u8>,
    index: usize,
}

impl PartialBuffer {
    fn new(data: Vec<u8>) -> PartialBuffer {
        PartialBuffer { data, index: 0 }
    }

    /// 返回剩余的数据
    fn remaining(&self) -> &[u8] {
        &self.data[self.index..]
    }

    /// 移除部分数据
    /// 缓冲区数据已经成功使用时调用
    /// 返回值表示剩余数据量
    fn consume(&mut self, amt: usize) -> usize {
        self.index += amt;
        self.data.len() - self.index
    }

    /// 预写入
    ///
    /// 这将只获取空间,并将空间返回。
    //  将由调用者负责将数据实际写入。
    fn pre_write(&mut self, s: usize) -> &mut [u8] {
        // 当前数据尺寸
        let current_len = self.data.len() - self.index;

        // 需要的容量
        let required_capacity = current_len + s;

        // 如果容量不够,则新建
        if required_capacity > self.data.capacity() {
            let mut new = Vec::<u8>::with_capacity(required_capacity);
            new.extend_from_slice(&self.data[self.index..]);
            self.data = new;
            self.index = 0;
        }

        // 如果不移动数据,空间不足
        if (self.data.len() + s) > self.data.capacity() {
            self.data.drain(..self.index);
            self.index = 0;
        }


        // 剩余空间足够
        let new_data_len = self.data.len() + s;
        self.data.resize(new_data_len, 0);
        &mut self.data[new_data_len - s..]

        // spare_capacity_mut
        //        unsafe set_len
        // https://doc.rust-lang.org/std/vec/struct.Vec.html#method.spare_capacity_mut
        //       let a = self.data.spare_capacity_mut();
        // unsafe { self.data.set_len(new_data_len); }
        // return &mut self.data[new_data_len - s..];
    }


    /// 将数据附加到自身
    ///
    /// 将自动处理空间不足的问题。
    fn append(&mut self, data: &[u8]) {
        // 当前数据尺寸
        let current_len = self.data.len() - self.index;

        // 需要的容量
        let required_capacity = current_len + data.len();

        // 如果容量不够,则新建
        if required_capacity > self.data.capacity() {
            let mut new = Vec::<u8>::with_capacity(required_capacity);
            new.extend_from_slice(&self.data[self.index..]);
            self.data = new;
            self.index = 0;
        }

        // 如果不移动数据,空间不足
        if (self.data.len() + data.len()) > self.data.capacity() {
            self.data.drain(..self.index);
            self.index = 0;
        }

        // 剩余空间足够
        self.data.extend_from_slice(data);
    }
}


pub struct AesEncryptedWriter<W> {
    inner: W,
    cipher: Option<Aes256Ctr>,
    key: Vec<u8>,
    iv: Vec<u8>,
    pending_send: Option<PartialBuffer>,
    send_has_error: bool,
    max_buf_size: usize,
}

impl<W> AesEncryptedWriter<W> {
    pub(crate) fn encrypted_append_iv(&mut self) {
        match &mut self.pending_send {
            None => {
                self.pending_send = Some(PartialBuffer::new(self.iv.to_vec()));
            }
            Some(p) => {
                p.append(&self.iv)
            }
        }

        self.iv.zeroize();
    }


    fn encrypted_encrypt_and_append(&mut self, data: &[u8]) -> usize {
        if data.is_empty() {
            return 0;
        }

        let available_space = self.max_buf_size - self.pending_send.as_ref().map_or(0, |d| d.remaining().len());
        let write_size = data.len().min(available_space);

        if write_size == 0 {
            return 0;
        }

        // 输出到缓冲区
        if let Some(p) = &mut self.pending_send {
            let new_buf = p.pre_write(write_size);
            self.cipher.as_mut().unwrap().apply_keystream_b2b(&data[..write_size], new_buf).unwrap();
        } else {
            let mut encrypted = data[..write_size].to_vec();
            self.cipher.as_mut().unwrap().apply_keystream(&mut encrypted);
            self.pending_send = Some(PartialBuffer::new(encrypted));
        }

        write_size
    }
}

impl<W: AsyncWrite + Unpin> AesEncryptedWriter<W> {
    pub fn new(inner: W, key: [u8; 32]) -> Self {
        let mut iv = [0u8; 16];
        rand::thread_rng().fill_bytes(&mut iv);

        Self {
            inner,
            cipher: None,
            key: key.to_vec(),
            iv: iv.to_vec(),
            pending_send: None,
            send_has_error: false,
            max_buf_size: 10 * 1024,
        }
    }
}

impl<W: AsyncWrite + Unpin> AsyncWrite for AesEncryptedWriter<W> {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        // 写策略
        // 标准情况: 将当前需要写的内容加密并合并到发送缓冲区,然后将发送缓冲区写入底层
        // 特殊情况:
        //      上次发生了写错误:不处理当前要写入的数据,而是先尝试上次失败的写入,如果成功则继续,否则返回错误
        //      缓冲区剩余空间不足:只写入缓冲区可用空间的数据
        //      底层返回 Pending:如果有数据写入了缓冲区,则直接返回 OK(已经写入缓冲区的尺寸);如果没有数据写入缓冲区,则返回 Pending。

        let this = self.get_mut(); //.as_mut().get_mut();

        // Initialize cipher if not already done
        if this.cipher.is_none() {
            let cipher = Aes256Ctr::new(this.key.as_slice().into(), this.iv.as_slice().into());
            this.cipher = Some(cipher);

            // let mut key: [u8; 32] = this.key[..32].try_into().unwrap();
            // let mut iv: [u8; 16] = this.iv[..16].try_into().unwrap();
            // this.cipher = Some(Aes256Ctr::new(&key.into(), &iv.into()));

            // 安全清零
            // key.zeroize();
            // iv.zeroize();
            this.key.zeroize();
        }

        // Write IV first
        if !this.iv.is_empty() {
            // 缓冲区必须大于 iv 长度,不过缓冲区不太可能小于 iv 长度
            this.encrypted_append_iv();
        }

        // 提前判断是否有错误历史记录
        // 有错误记录则优先尝试检查底层是否恢复
        if this.send_has_error {
            match &mut this.pending_send {
                Some(p)if !p.remaining().is_empty() => {
                    match Pin::new(&mut this.inner).poll_write(cx, p.remaining()) {
                        Poll::Ready(Ok(0)) => {
                            // 有数据却返回 0 ,那么原因就是底层已经关闭
                            return Poll::Ready(Ok(0));
                        }

                        Poll::Ready(Ok(n)) => {
                            // 取消错误标记,使得之后可以一次写入 (pending_send + buf)
                            this.send_has_error = false;
                            if p.consume(n) == 0 {
                                this.pending_send = None;
                            }
                        }

                        Poll::Ready(Err(e)) => {
                            return Poll::Ready(Err(e));
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                // 缓冲区为空则不执行操作
                _ => {}
            }
        }

        // 将数据加密并保存到缓冲区
        // 如果缓冲区满,返回值为 0
        // 从这里开始,只能返回 Ok(save_encrypted_data_size)
        let save_encrypted_data_size = this.encrypted_encrypt_and_append(buf);

        // 将缓冲区数据写入底层
        match &mut this.pending_send {
            Some(p) => if !p.remaining().is_empty() {
                match Pin::new(&mut this.inner).poll_write(cx, p.remaining()) {
                    // 返回 0 表示底层出现问题,可能关闭
                    Poll::Ready(Ok(0)) => {
                        this.send_has_error = true;
                        if save_encrypted_data_size == 0 {
                            // 必须写入缓冲区已满,未写入任何值才能返回 WriteZero.
                            return Poll::Ready(Ok(0));
                        }
                        return Poll::Ready(Ok(save_encrypted_data_size));
                    }
                    Poll::Ready(Ok(n)) => {
                        if p.consume(n) == 0 {
                            this.pending_send = None;
                        };
                        return Poll::Ready(Ok(save_encrypted_data_size));
                    }
                    Poll::Pending => {
                        return Poll::Ready(Ok(save_encrypted_data_size));
                    }
                    Poll::Ready(Err(_)) => {
                        this.send_has_error = true;
                        return Poll::Ready(Ok(save_encrypted_data_size));
                    }
                }
            } else {
                return Poll::Ready(Ok(save_encrypted_data_size));
            }
            _ => {
                return Poll::Ready(Ok(save_encrypted_data_size));
            }
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        loop {
            // let (pending_send, inner) = (&mut self.pending_send, &mut self.inner);
            let pending_send = &mut self.pending_send;
            let inner = &mut self.inner;


            // 第一阶段:获取待写入数据(短暂借用 pending_send)
            let data = match pending_send {
                Some(p) => p.remaining().to_vec(),
                None => break,
            };

            if data.is_empty() {
                break;
            }

            // 第二阶段:执行写入操作(仅借用 inner)
            let write_result = {
                // let inner = &mut self.inner;
                Pin::new(inner).poll_write(cx, data.as_ref())
            };

            // 第三阶段:处理写入结果(重新借用 pending_send)
            match write_result {
                Poll::Ready(Ok(n)) => {
                    if let Some(p) = pending_send {
                        if p.consume(n) == 0 {
                            //      self.pending_send = None;
                        }
                    }
                }
                Poll::Pending => return Poll::Pending,
                Poll::Ready(Err(e)) => {
                    self.send_has_error = true;
                    return Poll::Ready(Err(e));
                }
            }
        }
        Pin::new(&mut self.inner).poll_flush(cx)


        // if let Some(p) = &mut self.pending_send {
        //     while !p.remaining().is_empty() {
        //         match Pin::new(&mut self.inner).poll_write(cx, p.remaining()) {
        //             // 返回 0 表示底层出现问题,可能关闭
        //             Poll::Ready(Ok(0)) => {
        //                 self.send_has_error = true;
        //                 return Poll::Ready(Err(Error::from(std::io::ErrorKind::WriteZero)));
        //             }
        //             Poll::Ready(Ok(n)) => {
        //                 if p.consume(n) == 0 {
        //                     self.pending_send = None;
        //                     break;
        //                 }
        //             }
        //             Poll::Pending => return Poll::Pending,
        //             Poll::Ready(Err(e)) => {
        //                 self.send_has_error = true;
        //                 return Poll::Ready(Err(e));
        //             }
        //         }
        //     }
        // }
        // Pin::new(&mut self.inner).poll_flush(cx)
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        match self.as_mut().poll_flush(cx) {
            Poll::Ready(Ok(())) => {}
            other => return other,
        }

        Pin::new(&mut self.inner).poll_shutdown(cx)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;
    use tokio::io::AsyncWriteExt;
    use tokio::time;

    #[tokio::test]
    async fn basic_encryption_decryption() {
        let key = [0x42u8; 32];
        let data = b"Hello, AES world!";

        let inner = Vec::new();
        let mut writer = AesEncryptedWriter::new(inner, key);
        writer.write_all(data).await.unwrap();
        writer.flush().await.unwrap();

        let encrypted = writer.inner;
        assert_eq!(encrypted.len(), 16 + data.len());

        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);
        assert_eq!(&decrypted, data);
    }

    #[tokio::test]
    async fn iv_uniqueness() {
        let key = [0x55u8; 32];

        let mut writer1 = AesEncryptedWriter::new(Vec::new(), key);
        writer1.write_all(b"data1").await.unwrap();

        let mut writer2 = AesEncryptedWriter::new(Vec::new(), key);
        writer2.write_all(b"data2").await.unwrap();

        let iv1 = &writer1.inner[0..16];
        let iv2 = &writer2.inner[0..16];
        assert_ne!(iv1, iv2);
    }

    #[tokio::test]
    async fn chunked_writes() {
        let key = [0xAAu8; 32];
        let chunks = vec![b"CHUNK1", b"CHUNK2", b"CHUNK3"];

        let mut writer = AesEncryptedWriter::new(Vec::new(), key);
        for chunk in &chunks {
            writer.write_all(*chunk).await.unwrap();
        }
        writer.flush().await.unwrap();

        let encrypted = writer.inner;
        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);

        let expected: Vec<u8> = chunks.iter().flat_map(|&&chunk| chunk.to_vec()).collect();
        assert_eq!(decrypted, expected);
    }

    #[tokio::test]
    async fn empty_write() {
        let key = [0x00u8; 32];
        let mut writer = AesEncryptedWriter::new(Vec::new(), key);
        writer.write(b"").await.unwrap();
        writer.flush().await.unwrap();
        assert_eq!(writer.inner.len(), 16);
    }

    #[tokio::test]
    async fn partial_write_handling() {
        let key = [0xCCu8; 32];
        let data = vec![0xDDu8; 1024];

        let inner = SlowWriter { buffer: Vec::new() };
        let mut writer = AesEncryptedWriter::new(inner, key);

        // 添加超时机制
        let write_result = time::timeout(Duration::from_secs(10), writer.write_all(&data)).await;
        assert!(write_result.is_ok(), "Write operation timed out");

        let _ = time::timeout(Duration::from_secs(10), writer.flush()).await.unwrap();

        // 断言缓冲区已清空
        assert!(writer.pending_send.is_none(), "Flush did not clear pending send");

        // 其余断言...
        let encrypted = writer.inner.buffer;
        assert_eq!(encrypted.len(), 16 + data.len());

        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);

        println!("Decrypted data: {:?}", decrypted);

        assert_eq!(decrypted, data);
    }

    struct SlowWriter {
        buffer: Vec<u8>,
    }

    impl AsyncWrite for SlowWriter {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<Result<usize, io::Error>> {
            if rand::random::<u32>() % 10 == 0 {
                // 唤醒 cx
                _cx.waker().wake_by_ref();
                return Poll::Pending;
            }

            let n = 1.min(buf.len());
            self.buffer.extend(&buf[..n]);
            Poll::Ready(Ok(n))
        }

        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
            Poll::Ready(Ok(()))
        }

        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
            Poll::Ready(Ok(()))
        }
    }

    #[tokio::test]
    async fn error_propagation() {
        let key = [0xEEu8; 32];
        let mut writer = AesEncryptedWriter::new(ErrorWriter, key);

        let result = writer.write_all(b"test").await;

        // 第一遍写入不会出错,数据写入了缓冲区
        assert!(!result.is_err());


        // 第二遍会返回错误
        let result = writer.write_all(b"a").await;
        assert!(result.is_err());
        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::Other);

        // flush 会返回错误
        let result = writer.flush().await;
        assert!(result.is_err());
        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::Other);
    }

    struct ErrorWriter;

    impl AsyncWrite for ErrorWriter {
        fn poll_write(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &[u8],
        ) -> Poll<Result<usize, io::Error>> {
            Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "mock error")))
        }

        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
            Poll::Ready(Ok(()))
        }

        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
            Poll::Ready(Ok(()))
        }
    }

    #[tokio::test]
    async fn large_data_handling() {
        let key = [0xFFu8; 32];
        let data = vec![0x00u8; 1024 * 1024];

        let mut writer = AesEncryptedWriter::new(Vec::new(), key);
        writer.write_all(&data).await.unwrap();
        writer.flush().await.unwrap();

        let encrypted = writer.inner;
        assert_eq!(encrypted.len(), 16 + data.len());

        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);
        assert_eq!(decrypted, data);
    }


    // 测试 PartialBuffer 的 pre_write 方法
    #[test]
    fn test_partial_buffer_pre_write() {
        let mut pb = PartialBuffer::new(vec![1, 2, 3]);
        pb.index = 1; // 当前数据为 [2, 3]
        let slice = pb.pre_write(3);
        assert_eq!(slice.len(), 3);
        assert_eq!(pb.data.len(), 5); // 原始长度3,pre_write添加3,变为5
        assert_eq!(pb.index, 0);
    }

    // 测试密钥和IV清零
    #[tokio::test]
    async fn test_key_iv_zeroize_after_init() {
        let key = [0x42u8; 32];
        let inner = Vec::new();
        let mut writer = AesEncryptedWriter::new(inner, key);

        writer.write_all(b"test").await.unwrap();

        assert!(writer.key.iter().all(|&x| x == 0));
        assert!(writer.iv.iter().all(|&x| x == 0));
    }

    // 测试 flush 是否清空缓冲区
    #[tokio::test]
    async fn test_flush_empties_buffer() {
        let key = [0x42u8; 32];
        let data = b"test data";
        let mut writer = AesEncryptedWriter::new(Vec::new(), key);
        writer.write_all(data).await.unwrap();
        writer.flush().await.unwrap();
        assert!(writer.pending_send.is_none());
    }

    // 测试错误恢复逻辑
    #[tokio::test]
    async fn test_recover_after_error() {
        struct RecoverWriter {
            fail_first: bool,
            buffer: Vec<u8>,
        }

        impl AsyncWrite for RecoverWriter {
            fn poll_write(
                mut self: Pin<&mut Self>,
                _cx: &mut Context<'_>,
                buf: &[u8],
            ) -> Poll<Result<usize, io::Error>> {
                if self.fail_first {
                    self.fail_first = false;
                    Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "mock error")))
                } else {
                    self.buffer.extend_from_slice(buf);
                    Poll::Ready(Ok(buf.len()))
                }
            }

            fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
                Poll::Ready(Ok(()))
            }

            fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
                Poll::Ready(Ok(()))
            }
        }

        let key = [0xEEu8; 32];
        let data = b"test data";
        let inner = RecoverWriter {
            fail_first: true,
            buffer: Vec::new(),
        };
        let mut writer = AesEncryptedWriter::new(inner, key);

        writer.write(data).await.expect("First write failed");
        writer.write(data).await.expect("Second write failed");
        writer.flush().await.expect("Flush failed");

        let encrypted = writer.inner.buffer;
        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);

        // 修复后代码
        assert_eq!(
            decrypted,
            [data.as_slice(), data.as_slice()].concat() // 显式转换为切片的切片
        );
    }

    // 测试缓冲区大小限制
    #[tokio::test]
    async fn test_max_buf_size() {
        let key = [0xAAu8; 32];
        let mut writer = AesEncryptedWriter::new(Vec::new(), key);
        writer.max_buf_size = 32;

        let data = vec![0xBB; 64];
        let written = writer.write(&data).await.unwrap();
        assert_eq!(written, 32 - 16); // 确保写入量不超过缓冲区大小

        writer.flush().await.unwrap();
        let encrypted = writer.inner;
        let iv = &encrypted[0..16];
        let ciphertext = &encrypted[16..];

        let mut decryptor = Aes256Ctr::new(&key.into(), iv.into());
        let mut decrypted = ciphertext.to_vec();
        decryptor.apply_keystream(&mut decrypted);
        assert_eq!(decrypted, data[..written]);
    }

    // 测试多次写入IV只添加一次
    #[tokio::test]
    async fn test_iv_only_once() {
        let key = [0xCCu8; 32];
        let mut writer = AesEncryptedWriter::new(Vec::new(), key);

        writer.write_all(b"first").await.unwrap();
        writer.write_all(b"second").await.unwrap();
        writer.flush().await.unwrap();

        let encrypted = writer.inner;
        assert_eq!(encrypted.len(), 16 + "firstsecond".len());
        assert_ne!(&encrypted[0..8], &encrypted[16..27]); // 验证IV不同块
    }
}


#[cfg(test)]
mod partial_buffer_tests {
    use super::PartialBuffer;

    #[test]
    fn test_remaining() {
        let data = vec![1, 2, 3, 4, 5];
        let pb = PartialBuffer::new(data.clone());
        assert_eq!(pb.remaining(), &data[..]);

        let mut pb = PartialBuffer::new(data);
        pb.consume(3);
        assert_eq!(pb.remaining(), &[4, 5]);
    }

    #[test]
    fn test_consume() {
        let data = vec![1, 2, 3, 4, 5];
        let mut pb = PartialBuffer::new(data);

        assert_eq!(pb.consume(0), 5);
        assert_eq!(pb.index, 0);

        assert_eq!(pb.consume(3), 2);
        assert_eq!(pb.index, 3);
        assert_eq!(pb.remaining(), &[4, 5]);
    }

    #[test]
    #[should_panic(expected = "attempt to subtract with overflow")]
    fn test_consume_overflow() {
        let data = vec![1, 2, 3];
        let mut pb = PartialBuffer::new(data);
        pb.consume(3); // remaining is empty, index=3
        pb.consume(1); // index=4, data.len()=3
        // Accessing remaining() here will panic
        let _ = pb.remaining();
    }

    #[test]
    fn test_pre_write_without_reallocation() {
        let mut pb = PartialBuffer {
            data: Vec::with_capacity(10),
            index: 0,
        };
        pb.data.extend_from_slice(&[1, 2, 3, 4, 5]);
        pb.index = 2; // remaining: [3,4,5]

        let slice = pb.pre_write(3);
        assert_eq!(slice.len(), 3);
        assert_eq!(pb.data.len(), 8); // 5 + 3
        assert_eq!(pb.index, 2); // Data was drained

        // Verify data after draining and appending
        assert_eq!(pb.remaining(), [3, 4, 5, 0, 0, 0]);
    }

    #[test]
    fn test_pre_write_with_reallocation() {
        let mut pb = PartialBuffer {
            data: vec![1, 2, 3],
            index: 1, // remaining: [2,3]
        };

        let slice = pb.pre_write(4);
        assert_eq!(slice.len(), 4);
        assert_eq!(pb.data.len(), 6); // 2 (remaining) + 4
        assert_eq!(pb.data.capacity(), 6); // Reallocated to exact size
        assert_eq!(pb.index, 0);

        // Original remaining data is preserved
        assert_eq!(pb.data[..2], [2, 3]);
    }

    #[test]
    fn test_append_without_reallocation() {
        let mut pb = PartialBuffer {
            data: Vec::with_capacity(10),
            index: 0,
        };
        pb.data.extend_from_slice(&[1, 2, 3]);
        pb.index = 1; // remaining: [2,3]

        pb.append(&[4, 5]);
        assert_eq!(pb.remaining(), vec![2, 3, 4, 5]);
        assert_eq!(pb.index, 1);
    }

    #[test]
    fn test_append_with_reallocation() {
        let mut pb = PartialBuffer {
            data: vec![1, 2, 3],
            index: 2, // remaining: [3]
        };

        pb.append(&[4, 5, 6, 7]);
        assert_eq!(pb.data, vec![3, 4, 5, 6, 7]);
        assert_eq!(pb.index, 0);
        assert_eq!(pb.data.capacity(), 5);
    }

    #[test]
    fn test_multiple_operations() {
        let mut pb = PartialBuffer::new(vec![1, 2, 3, 4, 5]);

        pb.consume(2);
        assert_eq!(pb.remaining(), &[3, 4, 5]);

        let slice = pb.pre_write(2);
        slice.copy_from_slice(&[6, 7]);
        assert_eq!(pb.data, vec![3, 4, 5, 6, 7]);

        pb.append(&[8, 9]);
        assert_eq!(pb.data, vec![3, 4, 5, 6, 7, 8, 9]);
    }
}

fn main() {}

your self is behind an Pin, which means the field access self.inner goes through a DerefMut trait. the compiler cannot see the two borrows are disjoint when they are borrowed through a function.

1 Like

The issue is that self is a Pin (or more generally, something that implements DerefMut to your type) rather than a reference. When you try to mutably borrow a field on such type, the compiler implicitly performs the operation DerefMut::deref_mut(&mut self).pending_send, and as you can see this borrows the whole self because the deref_mut method is opaque to the borrow checker. Then when you try to access inner the same happens and you get a borrowing error on self. The solution is to first perform the deref_mut manually, store the result in a variable, and then access the fields you need on that variable. This way the field accesses won't go through deref_mut and the normal field-disjoint logic of the borrow checker will be able to apply.

let this = &mut *self;`
let (pending_send, inner) = (&mut this.pending_send, &mut this.inner); //you can also directly use this.pending_send and this.inner when you need instead of doing this
// ...
5 Likes

I think this should work:

    // manually `DerefMut` as a whole, then split borrow:
    let this = &mut *self;
    let pending_send = &mut this.pending_send;
    let inner = &mut this.inner;
2 Likes

Thank you very much @nerditation @SkiFire13, indeed it is a Pin issue.
I previously did not understand, was not clear that it actually executes DerefMut::deref_mut(&mut self).pending_send.
Thank you again, I am more confident in Rust.

I wrote code to test this feature, and now I understand this problem.
Leave this example, it may be helpful for future visitors.



use std::pin::Pin;

struct Point {
    x: f64,
    y: f64,
}

impl Point {
    pub fn x_mut(&mut self) -> &mut f64 {
        &mut self.x
    }

    pub fn y_mut(&mut self) -> &mut f64 {
        &mut self.y
    }
}


fn f0() {
    let mut point = Point { x: 1.0, y: 2.0 };

    let x_ref = point.x_mut();
    // error[E0499]: cannot borrow `point` as mutable more than once at a time
    // let y_ref = point.y_mut();

    *x_ref *= 2.0;
    //    *y_ref *= 2.0;
}

fn f1() {
    let mut point = Point { x: 1.0, y: 2.0 };

    {
        let Point { x, y } = &mut point;
        *x *= 2.0;
        *y *= 2.0;
    }

    println!("Point: ({}, {})", point.x, point.y);
}


fn f2() {
    let mut point = Point { x: 1.0, y: 2.0 };

    {
        let x_ref = &mut point.x;
        let y_ref = &mut point.y;
        *x_ref *= 2.0;
        *y_ref *= 2.0;
    }

    println!("Point: ({}, {})", point.x, point.y);
}


impl Point {
    pub fn x_y_mut(&mut self) -> (&mut f64, &mut f64) {
        (&mut self.x, &mut self.y)
    }
}

fn f3() {
    let mut point = Point { x: 1.0, y: 2.0 };

    {
        let (x_ref, y_ref) = point.x_y_mut();
        *x_ref *= 2.0;
        *y_ref *= 2.0;
    }

    println!("Point: ({}, {})", point.x, point.y);
}

fn f4() {
    let mut point = Point { x: 1.0, y: 2.0 };

    {
        let p = Pin::new(&mut point);

        let Point { x, y } = p.get_mut();
        *x *= 2.0;
        *y *= 2.0;
    }

    {
        let mut p = Pin::new(&mut point);
        let x_ref = &mut p.x;

        // error[E0499]: cannot borrow `p` as mutable more than once at a time
        //        let y_ref = &mut p.y;

        *x_ref *= 2.0;
        //        *y_ref *= 2.0;
    }

// https://users.rust-lang.org/t/cannot-split-mutable-borrows-of-struct-fields-in-poll-flush-implementation/126455/3?u=wangxinmian
    // The issue is that self is a Pin (or more generally, something that implements DerefMut to your type) rather than a reference. When you try to mutably borrow a field on such type, the compiler implicitly performs the operation DerefMut::deref_mut(&mut self).pending_send, and as you can see this borrows the whole self because the deref_mut method is opaque to the borrow checker. Then when you try to access inner the same happens and you get a borrowing error on self. The solution is to first perform the deref_mut manually, store the result in a variable, and then access the fields you need on that variable. This way the field accesses won't go through deref_mut and the normal field-disjoint logic of the borrow checker will be able to apply.

    {
        let mut p = Pin::new(&mut point);
        let this = &mut *p;
        let x_ref = &mut this.x;
        let y_ref = &mut this.y;

        *x_ref *= 2.0;
        *y_ref *= 2.0;
    }

    {
        let mut p = Pin::new(&mut point);
        let (x_ref, y_ref) = p.x_y_mut();
        *x_ref *= 2.0;
        *y_ref *= 2.0;
    }

    println!("Point: ({}, {})", point.x, point.y);
}


fn main() {
    f0();
    f1();
    f2();
    f3();
    f4();
}

1 Like