Make implementation of executing Python in Rust faster

I have a Rust application receiving a lot of data. I want the programmer to be able to test different algorithms on that same data in a modular way. The programmer can make different Python files containing a algorithm and run the Rust-application. Rust executes the Python using PyO3.

sum.py

# Algorithm to sum our data.

# This placeholder gets replaced with a Python list
# container our data from Rust.
{DATA}

def func(*args, **kwargs):
    return sum(data)

avg.py

# Algorithm to get average of our data.

# This placeholder gets replaced with a Python list
# container our data from Rust.
{DATA}

def func(*args, **kwargs):
    return sum(data) / len(data)

In Rust there is a struct Algorithm which the programmer can start() by passing the data stream to it. A new thread on Tokio will be created and each time new data comes in the Algorithm executes the function in the Python file.

Note that the Python function takes the whole data stream as input, so not only the latest incoming data.

To pass the data to Python I simple convert the vector in Rust to a string in a Python-list format.

It works. But how could I make this implementation more performant?

use pyo3::types::PyTuple;
use pyo3::types::PyAny;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use pyo3::prelude::*;
use std::sync::Arc;
use rand::Rng;

#[tokio::main]
async fn main() {
    // Command argument: Give the name of the Python-file containing the algorithm.
    let args: std::vec::Vec<String> = std::env::args().collect();
    let py_file = &args[1];

    // Create sender and receiver.
    let (tx, rx) = mpsc::channel::<i64>(10);
    let rx = Arc::new(Mutex::new(rx));
    
    // We just simulate a incoming datastream with this block.
    // Actually it will be data coming from an API.
    tokio::spawn(async move {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
            
            let r = rand::thread_rng().gen_range(0..100);
            tx.send(r).await.expect("Send error");
        }
    });

    // The programmer can create different Python files with a different algorithm.
    // That way the programmer can test different algorithms with the same data. 
    let algo = Algorithm::new(py_file);
    algo.start(rx.clone());

    // Keep program running for 20 seconds.
    tokio::time::sleep(tokio::time::Duration::from_millis(20000)).await;
}

pub struct Algorithm {
    py_file: String,
}
impl Algorithm {
    pub fn new(py_file: &str) -> Self {
        Algorithm { py_file: py_file.into()  }
    }

    // Function to start the algorithm.
    pub fn start(self, datastream: Arc<Mutex<mpsc::Receiver<i64>>>) -> tokio::task::JoinHandle<()> { 
        
        // Save the incoming stream so we can pass the whole stream to the python function.
        let mut data : std::vec::Vec<i64> = std::vec::Vec::new();

        // When a new data comes in from the stream we execute the function.
        tokio::spawn(async move {
            while let Some(n) = datastream.lock().await.recv().await {    
                data.push(n);
                self.execute(&data).await;
            }
        })
    }
    
    // Function to actually call Python code.
    async fn execute(&self, data: &std::vec::Vec<i64>) {
        pyo3::prepare_freethreaded_python();

        // Get python code from file.
        let mut python_code = std::fs::read_to_string(format!("{}", self.py_file)).expect("Failed to read file...");
       
        // The Python-file has a placeholder {DATA}. We replace {DATA} with
        // data = [<content>] so the Python function has access to the data
        // passed on from Rust.
        python_code = python_code.replace("{DATA}", &*self.put_data_in_python(&data));

        let result = Python::with_gil(|py| {
            let fun: Py<PyAny> = PyModule::from_code(
                py,
                &*python_code,
                "",
                "",
            ).unwrap()
            .getattr("func").unwrap()
            .into();


            fun.call(py, PyTuple::empty(py), None)
        });

        println!("[{}] Result: {}", self.py_file, result.unwrap());
    }

    // Replace {DATA} placeholder in Python file with a Python list
    // containing the data from our Rust vector.
    fn put_data_in_python(&self, data: &std::vec::Vec<i64>) -> String {
        if data.len() == 0 {
            return "data = []".to_string();
        }

        let mut py_str = "data = [".to_string();
        for d in data {
            py_str = format!("{}{},", py_str, d);
        }
        
        py_str.pop();
        py_str = format!("{}]", py_str);
    
        py_str
    }
}
1 Like

Which part is slow? Are you running in release mode?

Some things I noticed:

This

format!("{}", self.py_file)

is better written as

self.py_file.clone()

but you don't even need to do that for read_to_string. Just make it a reference.

&self.py_file

Your put_data_in_python is pretty suboptimal. You don't need to write std::vec::Vec. Vec is in the standard library prelude, so you can just put Vec. When you take &Vec<T> as an argument, you should take &[T] instead, since it is one less indirection to get to the data. To concatenate strings, you should use +=/push_str or write.

fn put_data_in_python(&self, data: &[i64]) -> String {
    let mut py_str = "data = [".to_string();
    for d in data {
        use std::fmt::Write;
        write!(py_str, "{},", d).unwrap();
    }

    py_str.pop();
    py_str += "]";

    py_str
}

Consider using Clippy: Introduction - Clippy Documentation

For the python stuff, it would be better to pass the data in directly instead of copy-pasting it into the file. I think that would look something like this:

def func(data):
    return sum(data) / len(data)
// skip the {DATA} stuff
fun.call1(py, data.clone())
5 Likes

Thanks for the answer. It doesn't look to work slow per se, it actually runs fast as hell. Was just wondering if it could be optimized even more.

fun.call1(py, data.clone()) doesn't work though. My understanding is that it only works for basic arguments and not complex data-types.

How about

fun.call1(py, (data.clone(),))

Found this example that should match yours.

1 Like

Full working example of how to pass a vec of structs from Rust to Python for the next person searching it:

use pyo3::types::PyAny;
use pyo3::prelude::*;

#[pyclass]
struct SomeItem {
    #[pyo3(get, set)]
    a: String,
    #[pyo3(get, set)]
    b: f64,
}

fn main() {
    let mut data = std::vec::Vec::<SomeItem>::new();

    for i in 0..2500 {
        data.push(SomeItem {
            a: format!("str {}", i),
            b: i as f64,
        });
    }

    pyo3::prepare_freethreaded_python();
    let python_code = std::fs::read_to_string("test.py").expect("Failed to read file...");
    Python::with_gil(|py| {
        let fun: Py<PyAny> = PyModule::from_code(
            py,
            &*python_code,
            "",
            "",
        ).unwrap()
        .getattr("func").unwrap().into();

        fun.call1(py, (data,));
    });
}

test.py

def func(data):
    for d in data:
        print(str(d.a) + " " + str(d.b))