I have a Rust application receiving a lot of data. I want the programmer to be able to test different algorithms on that same data in a modular way. The programmer can make different Python files containing a algorithm and run the Rust-application. Rust executes the Python using PyO3.
sum.py
# Algorithm to sum our data.
# This placeholder gets replaced with a Python list
# container our data from Rust.
{DATA}
def func(*args, **kwargs):
return sum(data)
avg.py
# Algorithm to get average of our data.
# This placeholder gets replaced with a Python list
# container our data from Rust.
{DATA}
def func(*args, **kwargs):
return sum(data) / len(data)
In Rust there is a struct Algorithm
which the programmer can start()
by passing the data stream to it. A new thread on Tokio will be created and each time new data comes in the Algorithm
executes the function in the Python file.
Note that the Python function takes the whole data stream as input, so not only the latest incoming data.
To pass the data to Python I simple convert the vector in Rust to a string in a Python-list format.
It works. But how could I make this implementation more performant?
use pyo3::types::PyTuple;
use pyo3::types::PyAny;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use pyo3::prelude::*;
use std::sync::Arc;
use rand::Rng;
#[tokio::main]
async fn main() {
// Command argument: Give the name of the Python-file containing the algorithm.
let args: std::vec::Vec<String> = std::env::args().collect();
let py_file = &args[1];
// Create sender and receiver.
let (tx, rx) = mpsc::channel::<i64>(10);
let rx = Arc::new(Mutex::new(rx));
// We just simulate a incoming datastream with this block.
// Actually it will be data coming from an API.
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let r = rand::thread_rng().gen_range(0..100);
tx.send(r).await.expect("Send error");
}
});
// The programmer can create different Python files with a different algorithm.
// That way the programmer can test different algorithms with the same data.
let algo = Algorithm::new(py_file);
algo.start(rx.clone());
// Keep program running for 20 seconds.
tokio::time::sleep(tokio::time::Duration::from_millis(20000)).await;
}
pub struct Algorithm {
py_file: String,
}
impl Algorithm {
pub fn new(py_file: &str) -> Self {
Algorithm { py_file: py_file.into() }
}
// Function to start the algorithm.
pub fn start(self, datastream: Arc<Mutex<mpsc::Receiver<i64>>>) -> tokio::task::JoinHandle<()> {
// Save the incoming stream so we can pass the whole stream to the python function.
let mut data : std::vec::Vec<i64> = std::vec::Vec::new();
// When a new data comes in from the stream we execute the function.
tokio::spawn(async move {
while let Some(n) = datastream.lock().await.recv().await {
data.push(n);
self.execute(&data).await;
}
})
}
// Function to actually call Python code.
async fn execute(&self, data: &std::vec::Vec<i64>) {
pyo3::prepare_freethreaded_python();
// Get python code from file.
let mut python_code = std::fs::read_to_string(format!("{}", self.py_file)).expect("Failed to read file...");
// The Python-file has a placeholder {DATA}. We replace {DATA} with
// data = [<content>] so the Python function has access to the data
// passed on from Rust.
python_code = python_code.replace("{DATA}", &*self.put_data_in_python(&data));
let result = Python::with_gil(|py| {
let fun: Py<PyAny> = PyModule::from_code(
py,
&*python_code,
"",
"",
).unwrap()
.getattr("func").unwrap()
.into();
fun.call(py, PyTuple::empty(py), None)
});
println!("[{}] Result: {}", self.py_file, result.unwrap());
}
// Replace {DATA} placeholder in Python file with a Python list
// containing the data from our Rust vector.
fn put_data_in_python(&self, data: &std::vec::Vec<i64>) -> String {
if data.len() == 0 {
return "data = []".to_string();
}
let mut py_str = "data = [".to_string();
for d in data {
py_str = format!("{}{},", py_str, d);
}
py_str.pop();
py_str = format!("{}]", py_str);
py_str
}
}