Hyper client, tokio pipe not closed

#1

HI,

I have update my project crate dependencies with cargo update and now I got “Too many open files (os error 24)” error.
It seems that the created pipe is not closed after I get the result. Do I have to close It manually?

I use these helpers below to make my request by calling send_request_1().

[dependencies]
libc = "0.2"
serde = "1"
serde_derive = "1"
serde_json = "1"
futures = "0.1"
hyper = "0.12"
tokio-core = "0.1"
tokio-timer = "0.2"
hyper-tls = "0.3"
chrono = "0.4"
getopts = "0.2"
postgres = {version = "0.15", features = ["with-chrono","with-serde_json"] }
quick-xml = "0.12"
crossbeam-utils = "0.5"
regex = "0.2"
uuid = "0.6"
base64 = "0.6"
libloading = "0.5"
mp4parse = { path = "../mp4parse-rust/mp4parse" }
galil-seiferas = "0.1"
rand = "0.5"

[dev-dependencies]
criterion = "0.2"

#[derive(Clone)]
pub struct HttpResponse {
    pub status: StatusCode,
    pub body: Vec<u8>,
    pub headers: HeaderMap,
    pub uri: Option<Uri>,
}
impl HttpResponse {
    pub fn set_uri(&mut self, uri: Uri) {
        self.uri = Some(uri);
    }
}

pub fn send_request_1(
    url: &str,
    method: Method,
    headers: Option<&Vec<(String, String)>>,
    data: Option<PostData>,
    timeout: Option<u32>,
    debug: HttpDebug,
) -> Result<HttpResponse, Box<Error>> {
    let logger = logger::singleton(LOG_LEVEL::SILENT);
    let start = Local::now();
    let timeout = match timeout {
        Some(res) => res as u64,
        None => 30,
    };
    let client = Client::builder().build(HttpsConnector::new(2)?);
    let mut req_builder = Request::builder();
    req_builder.method(method.as_str());
    match headers {
        Some(headers) => {
            for &(ref k, ref v) in headers {
                req_builder.header(k.as_str(), v.as_str());
            }
        }
        None => {}
    }
    let req;
    match data {
        Some(res) => match method {
            Method::GET => {
                if !url.ends_with("?") {
                    let mut url = String::from(url);
                    url.push('?');
                    match res.str {
                        Some(res) => {
                            url.push_str(res.as_str());
                        }
                        None => {}
                    }
                }
                req = req_builder.uri(url).body(Body::empty())?;
            }
            Method::POST => {
                req_builder.uri(url);
                match res.str {
                    Some(res) => {
                        req = req_builder.body(Body::from(res))?;
                    }
                    None => {
                        req = req_builder.body(Body::from(res.bytes.unwrap()))?;
                    }
                }
            }
            _ => {
                req = req_builder.uri(url).body(Body::empty())?;
            }
        },
        None => {
            req = req_builder.uri(url).body(Body::empty())?;
        }
    }

    let ret = send_request(&client, timeout, req, headers, true);
    return ret;
}

pub fn send_request(
    client: &Client<HttpsConnector<HttpConnector>>,
    timeout: u64,
    req: Request<Body>,
    cur_headers: Option<&Vec<(String, String)>>,
    follow_redir: bool,
) -> Result<HttpResponse, Box<Error>> {
    let uri = req.uri().clone();
    let fut = client.request(req).and_then(|res| {
        let response = (res.status(), res.headers().clone());
        res.into_body()
            .fold(Vec::new(), |mut v, chunk| {
                v.extend(&chunk[..]);
                futures::future::ok::<_, HyperError>(v)
            })
            .and_then(|chunks| {
                futures::future::ok::<_, HyperError>(Ok(HttpResponse {
                    status: response.0,
                    body: chunks.to_vec(),
                    headers: response.1,
                    uri: None,
                }))
            })
    });
    let work = Timeout::new(fut, Duration::from_secs(timeout));
    let mut core = Core::new()?;
    let mut res = core.run(work)?;
    if let Ok(ref mut http_res) = res {
        http_res.set_uri(uri);
        if follow_redir && http_res.status.is_redirection() {
            return to_redirect(http_res, client, timeout, cur_headers, 1);
        }
    }
    res
}

fn to_redirect(
    response: &HttpResponse,
    client: &Client<HttpsConnector<HttpConnector>>,
    timeout: u64,
    cur_headers: Option<&Vec<(String, String)>>,
    followed: u32,
) -> Result<HttpResponse, Box<Error>> {
    while response.status.is_redirection() && followed < 5 {
        if response.headers.contains_key(LOCATION) {
            let mut req_builder = Request::builder();
            req_builder.method(Method::GET.as_str());
            match cur_headers {
                Some(headers) => {
                    for &(ref k, ref v) in headers {
                        req_builder.header(k.as_str(), v.as_str());
                    }
                }
                None => {}
            }
            let location = response.headers.get(LOCATION).unwrap().to_str()?;
            let req = req_builder.uri(location).body(Body::empty())?;
            let resp = send_request(&client, timeout, req, cur_headers, false)?;
            return to_redirect(&resp, client, timeout, cur_headers, followed + 1);
        } else {
            return Err(Box::new(MyError {
                status: -1,
                message: format!("redirect without location {:?}", response.headers),
            }));
        }
    }
    Ok(response.clone())
}
2 Likes
#2

Same here. Something has started leaking pipes.

Have you got your previous Cargo.lock? I now regret not versioning my copy :slight_smile:

#3

I have a dedup backup, will try later if I can find what the troublesome version.

#4

The issue is caused by the tokio update to version 0.1.13.
I don’t have the issue no more when forcing to previous version: tokio = "=0.1.11"

3 Likes