Detect ANSI control character?

Hi

I am looking for a safe way to remove control characters from a string by replacing them with the replacement character (U+FFFD).

My implementation should mimic the following Python code.

View Python code
def replace_control_chars(untrusted_str: str, keep_newlines: bool = False) -> str:
    """Remove control characters from string. Protects a terminal emulator
    from obscure control characters.

    Control characters are replaced by � U+FFFD Replacement Character.

    If a user wants to keep the newline character (e.g., because they are sanitizing a
    multi-line text), they must pass `keep_newlines=True`.
    """

    def is_safe(chr: str) -> bool:
        """Return whether Unicode character is safe to print in a terminal
        emulator, based on its General Category.

        The following General Category values are considered unsafe:

        * C* - all control character categories (Cc, Cf, Cs, Co, Cn)
        * Zl - U+2028 LINE SEPARATOR only
        * Zp - U+2029 PARAGRAPH SEPARATOR only
        """
        categ = unicodedata.category(chr)
        if categ.startswith("C") or categ in ("Zl", "Zp"):
            return False
        return True

    sanitized_str = ""
    for char in untrusted_str:
        if (keep_newlines and char == "\n") or is_safe(char):
            sanitized_str += char
        else:
            sanitized_str += "�"
    return sanitized_str
  • I use the unicode-general-category crate using the Unicode v16.0 database.
  • Is my detection mechanism in the filter method safe?
  • Is my thread handling okay?
  • On L39 I use buf = [0u8; 1024]. If an adversary would use a multi-byte utf-8 character in the payload, wouldn't they be able to bypass my filter since a checked chunk might not contain the escape code?
use anyhow::{Context, Result};
use std::io::{Read, Write};
use std::process::{Command, Stdio};

fn filter_ansi_escape_codes(s: &str) -> String {
    fn is_safe(c: char) -> bool {
        !matches!(
            unicode_general_category::get_general_category(c),
            unicode_general_category::GeneralCategory::Control
                | unicode_general_category::GeneralCategory::Format
                | unicode_general_category::GeneralCategory::PrivateUse
                | unicode_general_category::GeneralCategory::Unassigned
                | unicode_general_category::GeneralCategory::LineSeparator
                | unicode_general_category::GeneralCategory::ParagraphSeparator
        )
    }

    s.chars()
        .map(|c| if !is_safe(c) { '\u{FFFD}' } else { c })
        .collect()
}

fn main() -> Result<()> {
    let mut child = Command::new("sh")
        .arg("-c")
        .arg(
            "printf '\\033[31mRED\\033[0m plain\\n' >&2; \\
             printf '\\033[32mGREEN\\033[0m plain\\n' >&2",
        )
        .stdout(Stdio::null())
        .stderr(Stdio::piped())
        .spawn()
        .context("Failed to spawn child process")?;

    let mut stderr = child.stderr.take().context("Failed to open child stderr")?;

    let stderr_thread = std::thread::spawn(move || -> Result<()> {
        let mut stderr_out = std::io::stderr().lock();
        let mut buf = [0u8; 1024];

        loop {
            let n = stderr.read(&mut buf).context("Failed to read child stderr")?;
            if n == 0 {
                break;
            }

            let s = std::str::from_utf8(&buf[..n]).context("Failed to decode UTF-8")?;
            let filtered = filter_ansi_escape_codes(s);

            stderr_out
                .write_all(filtered.as_bytes())
                .context("Failed to write filtered stderr")?;
            stderr_out.flush().context("Failed to flush stderr")?;
        }

        Ok(())
    });

    let status = child.wait().context("Failed to wait for child")?;
    stderr_thread
        .join()
        .expect("stderr thread panicked")
        .context("stderr thread failed")?;

    eprintln!("child exited with: {status}");
    Ok(())
}

[dependencies]
anyhow = "1.0.102"
unicode-general-category = "1.1.0"

Thanks!

&str must always be valid UTF-8, so it's never going to contain any partial characters. But what might happen is that that std::str::from_utf8 would fail if a multi-byte character spans a chunk boundary.

One thing to consider might be using BufReader to wrap your reader and perform filtering line-by-line, which would always be output as valid strings.

Other than that, so long as your code doesn't contain any unsafe, you can be confident that your thread handling is sound if it compiles successfully.

2 Likes

This is not true. Rust will protect you from data races (two threads accessing the same memory location simultaneously where at least one of them is a write). However, Rust won't protect you from race conditions in general, although it does gives some tools to make it harder to cause race conditions.

2 Likes

Thanks for your reply, I updated my code to use BufReader and I now also use from_utf8_lossy to avoid crashing on invalid utf-8. Is this better?

    let stderr = child.stderr.take().context("Failed to open stderr.")?;
    let stderr_thread = std::thread::spawn(move || -> Result<()> {
        let mut stderr_reader = BufReader::new(stderr);
        let mut stderr_out = std::io::stderr().lock();
        let mut line_buf = Vec::new();

        loop {
            line_buf.clear();
            let n = stderr_reader
                .read_until(b'\n', &mut line_buf)
                .context("Failed to read container stderr.")?;
            if n == 0 {
                break;
            }

            let s = String::from_utf8_lossy(&line_buf);
            let filtered : String = filter_ansi_escape_codes(&s);

            stderr_out.write_all(&filtered.as_bytes()).context("Failed to write to stderr_out.")?;
            stderr_out.flush().context("Failed to flush stderr_out.")?;
        }

        Ok(())
    });