Oneshot channel await takes a lot of time (dozens of ms)

Hi,

I have the following issue. I'm using a mpsc tokio channel to send messages to one worker started as a tokio::task::spawn, the messages are processed and responses are returned to the caller on a oneshot channel.

This is the method that sends the messages to the mpsc tokio channel, these messages are 'Job' structs that include the oneshot 'resp_rx' in order for the worker to return the response:

async fn read(&self, first_dir: u8, second_dir: u32, filename: u32, length: u32, offset: u32, op: char) -> Result<Option<Bytes>, IOUringError> {
    let (resp_tx, resp_rx) = oneshot::channel::<Option<Bytes>>();

    // Send the 'job' request to an async tokio channel. 'Job' struct contains the 'respond_to' element, where we will receive the response for this oneshot.
    let job = Job::new(first_dir, second_dir, filename, offset, length, 0, 0,0, Some(resp_tx), op);
    if let Err((err, entries)) = self.send_job(job, CONFIG.cache.io_uring.read.timeouts.send_timeout_ms).await {
        return Err(err)
    }

	// Await the response to be received in the oneshot channel.
    let start = Instant::now();
    match resp_rx.await {
        Ok(result) => {
            println!("id = {}/{}/{}, now = {:?}, receive found = {}us", first_dir, second_dir, filename, Instant::now(), start.elapsed().as_micros());
            Ok(result)  // This can be document (Some(document)) or None (not found)
        },
        Err(_) => {
            println!("id = {}/{}/{}, receive not found = {}us", first_dir, second_dir, filename, start.elapsed().as_micros());
            Err(IOUringError::ChannelReceiveError("receiver closed".to_string()))
        }
    }
}

Where the 'send_job' simply selects a mpsc tokio channel and sends the job:

async fn send_job(&self, job: Job, send_timeout_ms: u16) -> Result<usize, (IOUringError, usize)> {
    let channel_idx = get_channel_idx(job.filename);
    let channel_reader = &self.channel_vec[channel_idx];
    let tx = &channel_reader.channel;

    // First fast-path, non-blocking. Try to send everything without waiting.
    match tx.try_send(job) {
        Ok(()) => Ok(entries),
        Err(TrySendError::Closed(_job)) => {
            // Channel closed
            Err((IOUringError::ChannelSendError(t!("iouring.channel.msg_closed").to_string()), entries))
        }
        Err(TrySendError::Full(job)) => {
            match timeout(Duration::from_millis(send_timeout_ms as u64), tx.send(job)).await {
                Ok(Ok(())) => Ok(entries),
                Ok(Err(err)) => {
                    Err((IOUringError::ChannelSendError(err.to_string()), entries))
                },
                Err(err) => {   // timeout
                    Err((IOUringError::ChannelSendTimeout(err.to_string()), entries))
                }
            }
        }
    }
}	

And this is the worker:

fn start_reader(ring_vec_idx: usize, mut rx: mpsc::Receiver<Job>, ready_tx: std::sync::mpsc::Sender<()>) {
    tokio::task::spawn(async move {
		( ... init variables ...)
        while !IO_URING_READ.is_stop() {
            Self::get_packet(&mut ctx, ring_vec_idx, channel_size, max_packet_size, packet_size_timeout_duration, &mut rx).await;   // Try to get a complete batch of 'packet_size' elements

            // Send this packet to 'io_uring' if it's not empty, so all open/read/close_unlink operations are performed, and the responses are directly returned to the oneshot channel.
            if ctx.are_jobs()  {
                perform_open_read_close_operation(ring_vec_idx, &mut ctx, CONFIG.cache.io_uring.read.timeouts.disk_timeout_ms, cqe_max_wait_time_duration).await;
                ctx.clear();
            }
        }
    });
}

Where 'get_packet' reads the packets received on the '&mut mpsc::Receiver' and passes these jobs to 'perform_open_read_close_operation', that makes some operations and finally writes the responses (for each job) to the oneshot channels in this way:

            if let Some(channel) = job.resp_tx.take() {
               let _ = channel.send(document);
               println!("set_success_result :: id = {}, sent response SOME in {}us", job.filename, now.elapsed().as_micros());
               return;
            } else {
               prometheus::inc_failed_response();
            }

Well, this is what's happening. If I print a timestamp after 'perform_open_read_close_operation', all this stuff (included the response already returned on "let _ = channel.send(document)") just takes several microseconds, below 100us.

But the timestamp after "match resp_rx.await" on the "read" method, where the oneshot channel response is received, shows values at milliseconds level, I even saw hundreds of milliseconds.

And I'm stuck on this. I changed the oneshot by a mpsc tokio channel of size = 1, and the same result.

I don't understand why it's taking so long to awake this oneshot channel. In our tests the cpu is 90% idle, so the server is not stressed at all.
Any clue about what it could be happenning?

Thanks,
Joan.

You probably have a task that is blocking the thread. It's likely the sender.

Hi Alice,

Thanks for your quick response. Yes, I read your article when I started working with Rust, 8 months ago. And I have tried to be very careful with the sync stuff that could block the tokio runtime.

Regarding the read method (which is the sender, as you say) I can't see any blocking task that could block the thread. The flow is simple:

  1. async Axum handler that processes a "get(key) -> bytes" operation and returns the result as "return (StatusCode::OK, document).into_response();"

  2. This axum handler calls the async 'cache.read(key)' method that only does:

    pub async fn get(&self, method_id: u16, key: i64, s: u32) -> Result<(u32, Option<Bytes>, char), CacheError> {
    
     let disk_value_opt = {
     	// Papaya hashmap that checks if this 'method_id' in on the 'data' map. Read only map with few entries (< 10)
         let map = self.data.pin();
         let entry = map.get(&method_id).ok_or(CacheError::CacheNotFound(method_id))?;
    
     	// Quick cache that stores the necessary information to access the document on disk. Ultra-fast acces by key, < 5us
         if let Some(value) = entry.cache.get(&key) {
             if !value.is_expired() {
                 Some(CacheValue::new(value.first_dir, value.second_dir, value.filename, value.offset, value.length, value.error_id))
             } else {
                 entry.cache.remove(&key);
                 None
             }
         } else {
             None
         }
     };
    
     // If found, let's get the document from disk.
     if let Some(value) = disk_value_opt {
         if let Ok(Some(document)) = IO_URING_READ.read(value).await {
             return Ok((0, Some(document), constants::DISK_DESTINATION))
         }
     }
    
     Ok((0, None, constants::NONE_DESTINATION))
    

    }

And this 'IO_URING_READ.read' is calling the previous method that sends the message to the channel.

I only have 2 blocking operations in my code, but related to the write operations, not read operations. No channels there, just:

  1. An io uring write, synchronous using the 'enter' syscall, but conveniently isolated inside a spawn_blocking thread.
  2. An insert of entries to quick cache, around 30.000 entries inserted in around 3ms, but conveniently isolated inside a spawn_blocking thread, pending to test it with Rayon.

And that's all, does this give you some more light about this issue?

Anyways, if it's really a kind of blocked thread, what tool should I use to find out where this block comes from?

Thansk,
Joan.

How many tasks does your program spawn, and how are your runtimes configured? Do you use a dedicated runtime for the sender?

One tool you can use is Tokio console

Blocking tasks appear as tasks where the busy duration goes up, but the poll count does not.

1 Like

I'm using the default parameters that are coming with the global tokio runtime, I'm not using a specific runtime. Tokio-console is showing 27 tasks right now, I attach an screenshot, where you will also see the blocking tasks from the write operations, that appear and disappear with the load.

It looks like your tasks are getting polled several hundred thousand times. What are the tasks doing to end up like that?

Because this does allow for a different possibility: with a large number of tasks actively running, the receiver could end up in the task queue for a long time before it gets to run. Simply because there's so many other things that also need to run.

Ok, it was this timeout waiting on the mpsc receiver:
match timeout_at(deadline, rx.recv()).await {
}

This deadline was too low ( < 5ms). So, if I cannot wait much time on a channel because of the customer requirements (this comes from a get(key) operation where the client app cannot wait for a result more than several ms), what is my choice? move to sync and use a crossbeam channel and simply let the thread blocked while a result does not arrive?

I'm not sure why can't you just spawn a task that will wait? Or are you returning an error after 5ms and the client has to make another request? Well, in this case even blocking threads will not help, it will only make things worse. If I understand correctly, requirements themselves are making your app being dosed. So the requirements or algorithm should change.

In general, the pattern of waiting on something while using a timeout and a loop to repeatedly check a different condition is a bad idea.

Also, the tx.try_send is unnecessary. The async tx.send does itself contain a fast path for the case where the message can be sent right away - you do not need to add that yourself.

1 Like

Hi Alice,

I think this issue is coming from another side. Let me explain it, and how to solve it.

This is a disk cache, with reads and writes. All the explanation until now is about reads, but I also have writes.

For every request not found in an intermediate in memory cache, a 'put' is performed, that means to fill a buffer (in fact several buffers) until any of them is full, and in this case flush it to disk. Since the hit ratios are usually low, this happens the 80% of the time.

Now everything is async because, in case of flush, I need to spawn a blocking thread and wait for the result, but this only happens the 1% of the times. The 99% we are only appending a byte array to a buffer, and that takes less than 1us, and it's also async.

So I have a chain of async functions that, the 99% of times, take less than 1us.

So, my idea is to separate these 2 actions:

  1. Make all methods sync, if it's just a write to a buffer None is returned, if it's a flush the buffer index is returned.

  2. From the axum handler, just if this previous method returned Some(idx), the yes, call an async function that will spawn a blocking thread to flush to disk.

So, the 99% of times will be axum (async) -> method1 (sync) -> method1.1 -> (sync)-> method1.2 -> (sync), this takes less than 1us

And the 1% of the times is axum (async) -> flush (async), this can take around 500us (at most)

I'm new in rust (6 months), so I would only know if this idea could help, maybe you think it's a waste time, I don't know. It's true that, if you take a look at the tokio-console image I pasted, I think there is a lot of polling in the axum tasks (almost the same amount than iouring_read, my code), but I'm not sure if this behaviour is normal or not.

Let me know what you think.

Thanks a lot for your time,
Joan.