Why my TcpStream got blocked?

hi, i recently start learning rust, and i trying to connect to freeswitch mod_event_socket, and the code authentica correctly, but when i write to stream and read response, the stream throws:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 11, kind: WouldBlock, message: "Resource temporarily unavailable" }', src/main.rs:74:45
use std::{io,str};
use std::io::{BufRead, Read, Write};
use std::net::TcpStream;
use std::env;
use std::collections::HashMap;
use std::time::Duration;

#[derive(Debug)]
struct Pdu {
    header: HashMap<String, String>,
    content: Vec<u8>
}

impl Pdu {
    fn new() -> Self {
        Self {
            header: HashMap::new(),
            content: Vec::new()
        }
    }
    
    fn get_header_content(&mut self, stream: &mut impl Read) -> Vec<u8> {
        let mut raw: Vec<u8> = Vec::new();
        let mut reader = io::BufReader::new(stream);
        let mut buf: Vec<u8> = Vec::new();
        loop {
            buf.clear();
            let readed_bytes = reader.read_until(b'\n', &mut buf).unwrap();
            if readed_bytes == 1 && buf[0] == b'\n' {
                break;
            } else {
                raw.append(&mut buf);
            }
        }

        raw
    }

    fn parse_header(&mut self, stream: &mut impl Read) {
        let raw = self.get_header_content(stream);
        let raw_str = str::from_utf8(&raw).unwrap();
        let parts = raw_str
            .split("\n")
            .filter(|line| {
                line.bytes().count() > 0
            })
            .map(|line| {
                let mut item = line.splitn(2, ":");
                let key = item.next().unwrap();
                let value = item.next().unwrap();

                (key.trim(), value.trim()) 
            });
        
        for (key, value) in parts {
            self.header.insert(key.to_string(), value.to_string());
        }
    }

    fn parse_content(&mut self, stream: &mut TcpStream) {
        if let Some(length) = self.header.get("Content-Length") {
            let length: usize = length.parse().unwrap();
            println!("{:?}", self.header);
            println!("{:?}", stream);
            println!("content size: {}", length);

            let mut content = vec![0u8; length];
            // 2022-09-30: no logro entender porque queda bloqueado
            // he probado
            // - stream.try_clone()
            // - stream.read()
            // - stream.read_exact()
            // - leer 1 byte
            stream.read_exact(&mut content).unwrap();

            self.content.append(&mut content);
        }
    }
    
    fn parse(&mut self, stream: &mut TcpStream) {
        self.parse_header(stream);
        self.parse_content(stream);
    }
}

fn main() -> io::Result<()> {
    let args: Vec<String> = env::args().collect();
    let host = &args[1];
    let mut stream = TcpStream::connect(host)?;
    stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();

    let pdu = read_pdu(&mut stream).unwrap();
    if pdu.header["Content-Type"] == "auth/request" {
        send_command(&mut stream, "auth cloudpbx").unwrap();
        let pdu = read_response(&mut stream).unwrap();
        println!("auth pdu: {:?}", pdu);
    } else {
        panic!("fails to read auth response");
    }

    send_command(&mut stream, "api uptime").unwrap();
    //when this it's called got blocked
    read_response(&mut stream).unwrap();
    println!("{:?}", pdu);
    Ok(())
}

fn read_response(stream: &mut TcpStream) -> Result<(), &'static str> {
    // se debe obtener la mutabilidad del ultimo dueno
    let pdu = read_pdu(stream)?;

    if let Some(content_type) = pdu.header.get("Content-Type") {
        if content_type == "command/reply" {
                if pdu.header["Reply-Text"].starts_with("+OK") {
                    Ok(())
                } else {
                    println!("{:?}", pdu.header);
                    Err("can't auth")
                }
        } else {
            println!("response: {:?}", pdu);
            Ok(())
        }
    } else {
        Ok(())
    }
}

fn send_command(stream: &mut TcpStream, cmd: &str) -> Result<(), &'static str> {
    stream.write_fmt(format_args!("{}\n\n", cmd)).unwrap();
    stream.flush().unwrap();
    Ok(())
}

fn read_pdu(stream: &mut TcpStream) -> Result<Pdu, &'static str> {
    let mut pdu = Pdu::new();

    // read header
    pdu.parse(stream);
    
    Ok(pdu)
}

example protocol

Content-Type: auth/request

auth ClueCon

Content-Type: command/reply
Reply-Text: +OK accepted

api uptime

Content-Type: api/response
Content-Length: 6

28473

thanks any help

WouldBlock in this case means that it has reached the limit of the 1 second timeout you have set.

Hi, even if i put 10 seconds, not work

I can't speak to why you're getting a timeout with only the context provided (possibly the server device is ignoring your connections?); I was just trying to clarify what the error meant so you could investigate further.

hi, i run a local freeswitch, i can connect and reproduce the protocol with telnet, with Rust,just got blocked on the second read, the protocol it's very simply as you see, really i don't understad what happen

The buf variable is an empty vec. read_until on this will not work as there is no space to read anything into. You can use for example let mut buf = [0; 1024]; to have a 1024 byte buffer instead. Also make sure to replace raw.append(&mut buf) with raw.append(&buf[..readed_bytes]) as otherwise it will append the full 1024 bytes to raw even if only a small part is used. And finally the readed_bytes == 1 check should be replaced with breaking out of the loop if readed_bytes == 0 as in that case the connection is closed. You should also check for buf[readed_bytes-1] being b'\n' instead of buf[0].

thanks for your time, the problem was using io::BufReader, i move it to parent scope and works well, i suspect that io::BufReader get more data than required with read_xxx functions and when it goes out of scope is discarded,so the next read will block waiting for more data.

uuse std::{io,str};
use std::io::{BufRead, Read, Write};
use std::net::TcpStream;
use std::env;
use std::collections::HashMap;
use std::time::Duration;

#[derive(Debug)]
struct Pdu {
    header: HashMap<String, String>,
    content: Vec<u8>
}

impl Pdu {
    fn new() -> Self {
        Self {
            header: HashMap::new(),
            content: Vec::new()
        }
    }
    
    fn get_header_content(&self, reader: &mut impl io::BufRead) -> Vec<u8> {
        let mut raw: Vec<u8> = Vec::new();
        let mut buf: Vec<u8> = Vec::new();
        loop {
            buf.clear();
            let readed_bytes = reader.read_until(b'\n', &mut buf).unwrap();
            if readed_bytes == 1 && buf[0] == b'\n' {
                break;
            } else {
                raw.append(&mut buf);
            }
        }

        raw
    }

    fn parse_header(&mut self, reader: &mut impl io::BufRead) {
        let raw = self.get_header_content(reader);
        let raw_str = String::from_utf8(raw).unwrap();
        let parts = raw_str
            .split("\n")
            .filter(|line| {
                line.bytes().count() > 0
            })
            .map(|line| {
                let mut item = line.splitn(2, ":");
                let key = item.next().unwrap();
                let value = item.next().unwrap();

                (key.trim(), value.trim()) 
            });
        
        for (key, value) in parts {
            self.header.insert(key.to_string(), value.to_string());
        }
    }

    fn parse_content(&mut self, reader: &mut impl io::BufRead) {
        if let Some(length) = self.header.get("Content-Length") {
            let length: usize = length.parse().unwrap();
            let mut content = vec![0u8; length];

            reader.read_exact(&mut content).unwrap();

            self.content.append(&mut content);
        }
    }
    
    fn parse(&mut self, stream: &mut TcpStream) {
        let mut reader = io::BufReader::new(stream);
        self.parse_header(&mut reader);
        self.parse_content(&mut reader);
    }
}

fn main() -> io::Result<()> {
    let args: Vec<String> = env::args().collect();
    let host = &args[1];
    let mut stream = TcpStream::connect(host)?;
    stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();

    let pdu = read_pdu(&mut stream).unwrap();
    if pdu.header["Content-Type"] == "auth/request" {
        send_command(&mut stream, "auth cloudpbx").unwrap();
        let pdu = read_response(&mut stream).unwrap();
        println!("auth pdu: {:?}", pdu);
    } else {
        panic!("fails to read auth response");
    }

    for _n in 1..1000 {
        send_command(&mut stream, "api uptime").unwrap();       
        read_response(&mut stream).unwrap();
    }

    Ok(())
}

fn read_response(stream: &mut TcpStream) -> Result<(), &'static str> {
    // se debe obtener la mutabilidad del ultimo dueno
    let pdu = read_pdu(stream)?;

    if let Some(content_type) = pdu.header.get("Content-Type") {
        if content_type == "command/reply" {
                if pdu.header["Reply-Text"].starts_with("+OK") {
                    Ok(())
                } else {
                    println!("{:?}", pdu.header);
                    Err("can't auth")
                }
        } else {
            println!("response: {:?}", pdu);
            Ok(())
        }
    } else {
        Ok(())
    }
}

fn send_command(stream: &mut TcpStream, cmd: &str) -> Result<(), &'static str> {
    stream.write_fmt(format_args!("{}\n\n", cmd)).unwrap();
    stream.flush().unwrap();
    Ok(())
}

fn read_pdu(stream: &mut TcpStream) -> Result<Pdu, &'static str> {
    let mut pdu = Pdu::new();

    // read header
    pdu.parse(stream);
    
    Ok(pdu)
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.