Memory leak using ManuallyDrop

im using actix_web as backend framework
and for uploading files i have this code:

#[derive(Serialize, Deserialize, Clone)]
struct FileData {
    ptr: u64,
    len: usize,
    cap: usize
}

impl FileData {
    pub fn from_vec(vec: Vec<u8>) -> Self {
        let mut me = ManuallyDrop::new(vec);

        Self {
            ptr: me.as_mut_ptr() as u64,
            len: me.len(),
            cap: me.capacity()
        }
    }

    #[inline]
    pub fn into_vec(self) -> Vec<u8> {
        unsafe {
            Vec::from_raw_parts(self.ptr as *mut u8, self.len, self.cap)
        }
    }
}

#[derive(Serialize, Deserialize)]
pub struct File {
    name: String,
    data: FileData
}

impl File {
    #[inline]
    pub fn new(name: String, data: FileData) -> Self {
        Self { name, data }
    }

    #[inline]
    pub fn name(&self) -> &String {
        &self.name
    }

    #[inline]
    pub fn into_data(self)-> FileData {
        self.data
    }
}

#[repr(transparent)]
pub struct Multipart<T> {
    data: T
}

impl<T> Multipart<T> {
    #[inline]
    pub fn new(data: T) -> Self {
        Self { data }
    }

    #[inline]
    pub fn into_inner(self) -> T {
        self.data
    }
}

pub fn params_insert(
    params: &mut Map<String, Value>,
    field_name: &str,
    field_name_formatted: &String,
    element: Value,
) {
    if params.contains_key(field_name_formatted) {
        if let Value::Array(val) = params.get_mut(field_name_formatted).unwrap() {
            val.push(element);
        }
    } else if field_name.ends_with("[]") {
        params.insert(field_name_formatted.to_owned(), Value::Array(vec![element]));
    } else {
        params.insert(field_name_formatted.to_owned(), element);
    }
}

impl<T> Multipart<T>
where
    T: DeserializeOwned
{
    pub async fn extract_multipart(
        mut payload: actix_multipart::Multipart,
        send_progress: SendProgress,
        headers: HeaderMap
        ) -> Result<Self, MultipartError>
    {
        fn cleanup(filedatas: Vec<FileData>) {
            for filedata in filedatas {
                let _ = filedata.into_vec();
            }
        }

        let mut this: Map<String, Value> = Map::new();
        let mut filedatas: Vec<FileData> = Vec::new();

        'mainWhile: while let Ok(Some(mut field)) = payload.try_next().await {
            if let Some(filename) = field.content_disposition().get_filename() {
                /* we got a file */
                let field_name: String = field.name().into();
                let field_name_formatted: String = field_name.replace("[]", "");
                let filename: String = filename.replace(" ", "");
                let mut data: Vec<u8>;
                let mut bytes_read = 0;
                let file_len: Option<usize>;

                /* TODO: check for file extension */

                /* see if file len is in headers to preallocate data and check for size earlier */
                match headers.get(format!("x-file-len-{}", filename)) {
                    Some(__file_len) => {
                        /* TODO: handle errors on these unwraps (remove tmp files in ram) */
                        file_len = __file_len.to_str().unwrap().parse::<usize>().unwrap().into();
                        /* TODO: check for size earlier */
                        data = Vec::with_capacity(file_len.clone().unwrap());
                    },
                    None => {
                        file_len = None;
                        data = Vec::new();
                    }
                }

                while let Some(chunk) = field.next().await {
                    match chunk {
                        Ok(bytes) => {
                            let bytes: Vec<u8> = bytes.to_vec();

                            if file_len.is_none() {
                                data.reserve_exact(bytes.len());
                            }

                            bytes_read += bytes.len();
                            /* TODO: check for max file size */

                            for byte in bytes {
                                data.push(byte);
                            }

                            /* TODO: send progress info */
                        },
                        Err(e) => {
                            continue 'mainWhile;
                        }
                    }
                }

                if data.is_empty() {
                    continue 'mainWhile;
                }

                /* data is ready to be inserted */
                let filedata = FileData::from_vec(data);
                filedatas.push(filedata.clone());
                let file = File::new(filename, filedata);

                params_insert(&mut this, &field_name, &field_name_formatted, file.into());
            } else {
                /* we got a normal field */
            }
        }
        match serde_json::from_value::<T>(Value::Object(this)) {
            Ok(d) => Ok(Self::new(d)),
            Err(e) => {
                cleanup(filedatas);
                return Err(e.into());
            }
        }
    }
}

FromRequest is implemented
then in route i use it like this

#[derive(Deserialize)]
struct Data {
    file: File
}

pub async fn route_handler(data: Multipart<Data>)  -> HttpResponse {
    let data = data.into_inner();

    write_data_into_disk(data); /* this calls FileData::into_vec and drops it */

    /* send response here */
}

it works but it leaks memory
not always though
any help would be appreciated

Could you explain what FileData is for? Why are you using it instead of a simple Vec<u8>?

It's not surprising that it leaks since it doesn't implement Drop, but it's also entirely unsound because its Deserialize implementation can be used to read an arbitrary address as if it were a Vec’s data. Most likely, it will need to be replaced with something entirely different, but it's not clear what.

6 Likes

Unfortunately, your code snippet is missing the Clone implementation for FileData. I suspect that's the root cause of your issue.

Anyway, I assume you're trying to achieve some kind of data sharing. No need to reinvent the wheel—the brilliant authors of the Rust standard library have already done this for you. Just use std::sync::Arc:

use std::sync::Arc;

pub struct File {
    name: String,
    data: Arc<Vec<u8>>, // "Pointer" to a shared, readonly Vec
}

impl File {
    pub fn new(name: String, data: Arc<Vec<u8>>) -> Self {
        Self { name, data }
    }
}

// ...

let mut filedatas: Vec<Arc<Vec<u8>>> = Vec::new();

// ...

let filedata = Arc::new(data);
filedatas.push(filedata.clone());
let file = File::new(filename, filedata);

Update: I revisited your code snippet and started wondering... It seems like the whole filedatas Vec and the cleanup() function might not even be necessary. You could probably just use an Arc to send your data across threads, share it if needed, and let Arc handle dropping the contents once there are no longer any references.

3 Likes

i want to add that data to a Map<String, Value> and cast it to a struct later.
if i use Vec<u8> i have to insert it in Map using Vec<Value> (because Value::Array accepts Vec<Value> and not Vec<anything else>)
size of Value is 72 bytes. so for each u8 i have to allocate one Value::Number which is not optimal in my opinion (correct me if im wrong and if there is another way of using Vec<u8>)
so FileData takes Vec<u8> tells compiler not to drop it and whenever it wanted to write file to disk or do whatever with that data it creates Vec<u8> with that data and after that Vec will drop data associated with it.
it just somehow leaks memory
same thing happens
same execution path
sometimes leaks
sometimes its ok
im confused...

yea but Arc does not implement Serialize
and i want to add my data to a Map<String, Value> and then cast it to a struct using serde_json::from_value::<MyStruct>(map)
updated: it does derive Clone i forgot to write it there XD

It does, only behind the feature flag rc of serde.

2 Likes

oh thanks
but it doesnt solve my problem.
when a Arc<Vec<u8>> gets converted to a serde_json::Value it goes like this:

Value::Array(Vec<Value::Number>)

which uses a lot more ram than just Vec<u8>

It seems to me that what you want is a custom serde deserializer for your format. Using Value will be suboptimal as you noticed, since it wasn't made to handle some variants like byte buffers.

2 Likes

with some print statements i realized that for any ManuallyDrop::new call there is a Vec::from_raw_parts call owning leaked data and droping it after use.
however that data gets leaked in some situation (even though Vec::from_raw_parts gets called and drops it).
so in this snippet there is a copy that i cant see.
or some other weird thing that i am not aware of....
i wonder if i can just use Bytes instead of Vec

In this case, it’s a recipe for disaster.

Even though it may sound a bit direct, I strongly recommend removing all of your unsafe code. This includes ManuallyDrop, as it won't be necessary after that.

What does this MyStruct look like? I’m still not completely sure what problem you’re trying to solve. Could you provide a more concrete example of your use case?

3 Likes

i am trying to make a way for uploading files to a webserver in a generic way
by that i mean i want to be able to define a struct like this

/* this can be any other struct as long as impls DeserializeOwned */
#[derive(Deserialize)]
struct Data {
    name: String,
    age: i32,
    pfp: File
}

then i have a extractor called Multipart that gets a type like this Data and tries to make this Data from http multipart data

pub struct Multipart<T> {
    data: T
}

impl<T> Multipart<T> {
    #[inline]
    pub fn new(data: T) -> Self {
        Self { data }
    }

    #[inline]
    pub fn into_inner(self) -> T {
        self.data
    }
}

impl<T> FromRequest for Multipart<T>
where
    T: DeserializeOwned
{
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output=Result<Self, Self::Error>>>>

    fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
        let multipart = actix_multipart::Multipart::new(req.headers(), payload.take());

    Box::pin(async move {
        match Self::extract_multipart(multipart).await {
            Ok(data) => Ok(Self::new(data)),
            Err(e) => Err(ErrorBadRequest(format!("{:?}", e)))
        }
    })    
    }
}

and extract_multipart function is like this:

impl<T: DeserializeOwned> Multipart<T> {
    async fn extract_multipart(
        mut payload: actix_multipart::Multipart
    ) -> Result<T, serde_json::Error>
    {
        let mut this: Map<String, Value> = Map::new(); /* this will have our data until we load every field */

        'mainWhile: while let Ok(Some(mut field)) = payload.try_next().await {
            if let Some(filename) = field.content_disposition().get_filename() {
                /* we got a file handle uploading it */
                let field_name: String = field.name().into();
                let field_name_formatted: String = field_name.replace("[]", ""); /* in case this field is an array */
                let mut data: Vec<u8>;

                while let Some(chunk) = field.next().await {
                    match chunk {
                        Ok(chunk_data) => {
                            let bytes: Vec<u8> = chunk_data.to_vec();
                            data.reserve_exact(bytes.len());

                            for byte in bytes.into_iter() {
                                data.push(byte);
                            }
                        },
                        Err(e) => {
                            continue 'mainWhile; /* just go next field */
                        }
                    }
                }

            if data.is_empty() {
                continue; /* data is empty go next field */
            }

            let filedata = FileData::from_vec(data); 
            let file = File::new(filename, filedata);

            params_insert(&mut this, &field_name, &field_name_formatted, file.into());
            } else {
                /* we got a normal field like String or i32 or ... */
                /* omitted */
            }
        }

        /* tries to build T which in this case is struct Data from Map this */
        Ok(serde_json::from_value::<T>(Value::Object(this))?)
    }
}

that params_insert function:

fn params_insert(
    params: &mut Map<String, Value>,
    field_name: &String,
    field_name_formatted: &String,
    element: Value,
) {
    if params.contains_key(field_name_formatted) {
        if let Value::Array(val) = params.get_mut(field_name_formatted).unwrap() {
            val.push(element);
        }
    } else if field_name.ends_with("[]") {
        params.insert(field_name_formatted.to_owned(), Value::Array(vec![element]));
    } else {
        params.insert(field_name_formatted.to_owned(), element);
    }
}

and now for FileData and File:

FileData tells compiler not to drop our data: Vec<u8> heap allocated data which is our uploaded file to ram.
and hold information to rebuild it later (into Vec<u8>).

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FileData {
    ptr: u64,
    len: usize,
    cap: usize
}

impl FileData {
    /* dont drop vec */
    pub fn from_vec(vec: Vec<u8>) -> Self {
        let mut me = ManuallyDrop::new(vec);

        println!("ManuallyDrop: {:?} - {} - {}", me.as_mut_ptr(), me.len(), me.capacity());

        Self {
            ptr: me.as_mut_ptr() as u64,
            len: me.len(),
            cap: me.capacity()
        }
    }

     /* build vec so it gets cleaned */
    pub fn into_vec(self) -> Vec<u8> {
        println!("Vec::from_raw_parts: {:?} - {} - {}", self.ptr as *mut u8, self.len, self.cap);
        println!("------------------------");
        unsafe { Vec::from_raw_parts(self.ptr as *mut u8, self.len, self.cap) }
    }
}

impl Into<Value> for FileData {
    fn into(self) -> Value {
        serde_json::to_value(self).unwrap()
    }
}

and File holds our uploaded filename and this FileData

#[derive(Debug, Serialize, Deserialize)]
pub struct File {
    name: String,
    data: FileData
}

impl File {
    #[inline]
    pub fn new(name: String, data: FileData) -> Self {
        Self { name, data }
    }

    #[inline]
    pub fn name(&self) -> &String {
        &self.name
    }

    #[inline]
    pub fn data(&self)-> &FileData {
        &self.data
    }
}

impl Into<FileData> for File {
    fn into(self) -> FileData {
        self.data
    }
}

impl Into<Value> for File {
    fn into(self) -> Value {
        serde_json::to_value(self).unwrap()
    }
}

i guess thats all
remind me if im missing something...

and yea lets imagine incoming data is ok with all the fields of our Data struct
because even when every FileData::from_vec call has a counter FileData::into_vec call it still leaks something.
and that thing is exactly a file data

You have to keep FileData around so you can rebuild the Vec later. So why would FileData be dropped? And if FileData cannot be dropped, why can't it just contain the Vec?

2 Likes

if FileData contain the vec when serde tries to serialize it it converts Vec<u8> to Value::Array variant.
Value::Array variant has a data as Vec<Value>.
in this converts a u8 becomes a Value::Number
and size of Value::Number is a lot more than a byte.
so when uploading 20MB it consumes around 1.4GB ram.
so thats not an option.

so here FileData gets moved to File struct then gets serialized and goes to a Map
when trying to read data i access FileData of File struct and rebuild the Vec with Vec::from_raw_parts

it becomes like this:

struct Data {
    // ...
    pfp: File
}

pub async fn route_handler(data: Multipart<Data>) -> HttpResponse {
    let data: Data = data.into_inner();
    let filedata: FileData = data.into();

    let vec: Vec<u8> = filedata.into_vec();

    // do stuff
}

Using unsafe code and ManuallyDrop to change the serialization format should not be necessary. The suggestion from @SkiFire13 would be a much better solution. I see you didn't reply to that, so perhaps you missed it.

3 Likes

Ah, I see. Thanks for clarifying! I have a quick question:

When I look at the example in the Actix Multipart documentation, it shows how to upload a file using TempFile.

Maybe using a temporary file isn’t suitable for your use case. According to the documentation for the Bytes struct, it seems like you could load the data into memory instead.

Is there a reason why this solution wouldn’t work for you?

1 Like

ok im a little bit outdated on actix-multipart crate
im using version 0.4.0
latest is 0.7.2
thank you for mentioning

This has nothing to do with anything you described so far, really. If you want to serialize a Vec<u8> as a binary blob in some efficient binary format, put it in a newtype wrapper and call it a day.

There's absolutely no reason to resort to unsafe code here. Zero.

Here's an example of using base64:

though you can define that several ways (newtype wrapper that manually implements serde, for example)

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.