Program Structure for Real Time / Parallel Data Processing

Hey everyone!

I've been working on a project that uses threads heavily, which is starting to become problematic. I'm wondering if anyone has any insight on better ways to structure the program, perhaps with an async executor. I know the conventional wisdom is to use async for IO-bound tasks, but I'm not quite sure if that's the case here.

The short summary is that this program processes real-time data from a radio telescope. I'm capturing packets at 10 Gbps using libpcap, decoding them into raw complex spectrum data, reordering to fix out-of-order packets, averaging in time, and doing other middleware-esque processing before sending it out to an ML classifier via IPC, eventually at a nominal 2 Gbps data rate on the other side. All these tasks are currently threads connected with crossbeam channels. In addition, I'm collecting statistics on channel backpressure, raw data distribution, timing, and holding on to a large ring buffer that is dumped to disk on a UDP trigger. The statistics are shipped to a prometheus database with a hyper webserver. Lots going on!

To be more concrete, I'm receiving 8kb of data every 8us, and the total runtime of all the processing is more than 8us, implying something has to be pipelined and processed in parallel. Things would be easier to put into something like Rayon if these tasks weren't stateful. However, tasks like downsampling must have state in the sense that I only produce data on the outgoing channel for every N input chunks. I have some tasks that are pure functions, some that are downsampling, and some that produce more data than comes in. To make things even more complicated, I have to control thread affinity and pin tasks to cores that share a NUMA node, otherwise I drop packets.

This is all well and good, and honestly is running fine. However, I'm underutilizing the cores and using too many. I look at htop and each thread is only at ~70% load. When benchmarking, my tasks are, on average, 2-3us. I would put multiple together, but that makes reasoning about the program structure harder. As in, if I want to add an additional processing step in the middle of a merged task, that's harder than if they were two separate tasks with channels connecting them.

So, my question is: is it possible to build out this kind of stateful-pipelined program with async rust? I thought that work-stealing helps in the sense that I can utilize the cores better, perhaps fewer of them, while still maintaining this chained-together model. I have difficulty finding references to something like this with infinite-runtime processing loops chained together in an async context.

If you want to see what I have so far - I'm working on it here.

Thanks for your help!

2 Likes

Very detailed description and very cool project! I quickly skimmed over the main.rs and your description, so here is what I could deduce:

  1. The simplest way to reduce the threads of your app is to replace #[tokio_main] with #[tokio::main(flavor = "current_thread")]. This will make your web-server single-threaded and execute only on the main thread, which seems to be what you actually want. See documentation here.
  2. Tasks as threads seems wasteful to me and I never encountered such a pattern in HPC. If I were you I'd probably try to use a thread-pool implementation such as Rayon, even if this requires sharing state across threads and memory regions. You seem to worry very much about NUMA regions, but I think (I hope, rather. I have no way of knowing. But I'd give it a good old try :slightly_smiling_face:) utilizing your threads more will mitigate longer memory access to potentially non-local memory.
  3. Why do you split your computations into different tasks on different threads in the first place? I'd assume each packet is processed the same and in order, like (I) receive input from telescope, (II) decode, (III) reorder, (IV) average, (V) send to ML classifier. If this is the case I'm sure there are better ways for parallelization than your approach (also one that requires less message passing between threads).
  4. IO in your case seems tricky indeed. Where are your FPGAs used in the pipeline? Do you need to wait for a response from your ML model? How often are your metrics dumped to disk? I'd first try and find the right way to parallelize processing, before I'd try and make it asynchronous.
2 Likes

Thanks for your response!

  1. Awesome note of running tokio single-threaded on the main task. That's exactly what I want!

  2. I'm using tasks like this because, as I mentioned, some aren't functions in the strict sense. As in, for a given input - they may or may not directly produce a result. For example, the reordering task has to wait until we encounter the next payload we expect (because we're getting input data slightly out of order), otherwise, we store the input in a buffer and produce nothing on the output. I benchmarked the message passing itself, and the throughput of the crossbeam channels seems quite good. I can send my 8k payload at upwards of 30 Gbps.

I'm sure there are better ways for parallelization than your approach

Indeed! That's what I'm trying to ascertain with this post :slight_smile:

  1. The FPGA is sitting in the radio telescope and producing UDP payloads on a 10 GbE line. Standard tokio sockets and std sockets were not fast enough to keep up, which I why I'm using libpcap. I tried to directly work with AF_PACKET and MMAP regions, but got in way over my head - and my understanding is that is what pcap is doing anyway. I do need to wait for a response from the classifier; that's what's used to trigger a dump of the unprocessed data. As such, I have a ~30s ringbuffer that is populated in a task. The IO for dumping that kinda sucks too. I'm writing the data to an HDF5 file, which takes quite a while to serialize on our non-SSDs (the total file size is around 30GB). The metrics are populated every 10s or so, and that whole side of things seems to be working fine.

I should also mention that I almost require message passing with bounded channels to deal with uneven latency. As each thread blocks, I need some way to store the backlog while the thread is yielded.

Just looking at the flow of data through your tasks, it looks like you could pretty easily split the incoming payload stream into chunks and assign them to a set of pooled worker threads, where each worker would run the rest of the processing for the chunk they receive internally rather than using a dedicated thread for each task.

The reordering might be fine to just do on the thread you're receiving the packets on?

The downsampling appears to happen right after that, and looks like it's always a fixed number of payloads, which is why I think you might be able to just take a fixed chunk of the incoming payloads and send them off to a worker thread for the rest of the processing.

I suspect the NUMA issue had more to do with "ping pong"-ing your very large Payload struct across NUMA nodes more than anything else[1] , if you ensure the larger payloads don't get sent across multiple channels that might not be an issue anymore. Condensing those tasks so they aren't all connected with channels would help with that.


  1. e.g. your capture task might be on node A, reorder task on node B, split task on node A, etc. so you could end up bouncing the payload data back and forth across the node boundary multiple times for every single payload and saturate the available bandwidth ↩ī¸Ž

1 Like