Hi
I am looking for a safe way to remove control characters from a string by replacing them with the replacement character (U+FFFD).
My implementation should mimic the following Python code.
View Python code
def replace_control_chars(untrusted_str: str, keep_newlines: bool = False) -> str:
"""Remove control characters from string. Protects a terminal emulator
from obscure control characters.
Control characters are replaced by � U+FFFD Replacement Character.
If a user wants to keep the newline character (e.g., because they are sanitizing a
multi-line text), they must pass `keep_newlines=True`.
"""
def is_safe(chr: str) -> bool:
"""Return whether Unicode character is safe to print in a terminal
emulator, based on its General Category.
The following General Category values are considered unsafe:
* C* - all control character categories (Cc, Cf, Cs, Co, Cn)
* Zl - U+2028 LINE SEPARATOR only
* Zp - U+2029 PARAGRAPH SEPARATOR only
"""
categ = unicodedata.category(chr)
if categ.startswith("C") or categ in ("Zl", "Zp"):
return False
return True
sanitized_str = ""
for char in untrusted_str:
if (keep_newlines and char == "\n") or is_safe(char):
sanitized_str += char
else:
sanitized_str += "�"
return sanitized_str
- I use the unicode-general-category crate using the Unicode v16.0 database.
- Is my detection mechanism in the filter method safe?
- Is my thread handling okay?
- On L39 I use
buf = [0u8; 1024]. If an adversary would use a multi-byte utf-8 character in the payload, wouldn't they be able to bypass my filter since a checked chunk might not contain the escape code?
use anyhow::{Context, Result};
use std::io::{Read, Write};
use std::process::{Command, Stdio};
fn filter_ansi_escape_codes(s: &str) -> String {
fn is_safe(c: char) -> bool {
!matches!(
unicode_general_category::get_general_category(c),
unicode_general_category::GeneralCategory::Control
| unicode_general_category::GeneralCategory::Format
| unicode_general_category::GeneralCategory::PrivateUse
| unicode_general_category::GeneralCategory::Unassigned
| unicode_general_category::GeneralCategory::LineSeparator
| unicode_general_category::GeneralCategory::ParagraphSeparator
)
}
s.chars()
.map(|c| if !is_safe(c) { '\u{FFFD}' } else { c })
.collect()
}
fn main() -> Result<()> {
let mut child = Command::new("sh")
.arg("-c")
.arg(
"printf '\\033[31mRED\\033[0m plain\\n' >&2; \\
printf '\\033[32mGREEN\\033[0m plain\\n' >&2",
)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn child process")?;
let mut stderr = child.stderr.take().context("Failed to open child stderr")?;
let stderr_thread = std::thread::spawn(move || -> Result<()> {
let mut stderr_out = std::io::stderr().lock();
let mut buf = [0u8; 1024];
loop {
let n = stderr.read(&mut buf).context("Failed to read child stderr")?;
if n == 0 {
break;
}
let s = std::str::from_utf8(&buf[..n]).context("Failed to decode UTF-8")?;
let filtered = filter_ansi_escape_codes(s);
stderr_out
.write_all(filtered.as_bytes())
.context("Failed to write filtered stderr")?;
stderr_out.flush().context("Failed to flush stderr")?;
}
Ok(())
});
let status = child.wait().context("Failed to wait for child")?;
stderr_thread
.join()
.expect("stderr thread panicked")
.context("stderr thread failed")?;
eprintln!("child exited with: {status}");
Ok(())
}
[dependencies]
anyhow = "1.0.102"
unicode-general-category = "1.1.0"
Thanks!