Deno_runtime running multiple invokes on single worker concurrently

I'm trying to run multiple invocation of the same script on a single deno MainWorker concurrently, and waiting for their results (since the scripts can be async). Conceptually, I want something like the loop in run_worker below.

type Tx = Sender<(String, Sender<String>)>;
type Rx = Receiver<(String, Sender<String>)>;

struct Runner {
    worker: MainWorker,
    futures: FuturesUnordered<Pin<Box<dyn Future<Output=(String, Result<Global<Value>, Error>)>>>>,
    response_futures: FuturesUnordered<Pin<Box<dyn Future<Output=(String, Result<(), SendError<String>>)>>>>,
    result_senders: HashMap<String, Sender<String>>,
}

impl Runner {

    fn new() ...
    
    async fn run_worker(&mut self, rx: &mut Rx, main_module: ModuleSpecifier, user_module: ModuleSpecifier) {
        self.worker.execute_main_module(&main_module).await.unwrap();
        self.worker.preload_side_module(&user_module).await.unwrap();

        loop {
            tokio::select! {
                msg = rx.recv() => {
                    if let Some((id, sender)) = msg {
                        let global = self.worker.js_runtime.execute_script("test", "mod.entry()").unwrap();
                        self.result_senders.insert(id, sender);
                        self.futures.push(Box::pin(async {
                            let resolved = self.worker.js_runtime.resolve_value(global).await;
                            return (id, resolved);
                        }));
                    }
                },
                script_result = self.futures.next() => {
                    if let Some((id, out)) = script_result {
                        self.response_futures.push(Box::pin(async {
                            let value = deserialize_value(out.unwrap(), &mut self.worker);
                            let res = self.result_senders.remove(&id).unwrap().send(value).await;
                            return (id.clone(), res);
                        }));
                    }
                },
                // also handle response_futures here
                else => break,
            }
        }
    }
}

The worker can't be borrowed as mutable multiple times, so this won't work. So the worker has to be a RefCell, and I've created a BorrowingFuture:

struct BorrowingFuture {
    worker: RefCell<MainWorker>,
    global: Global<Value>,
    id: String
}

And its poll implementation:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    match Pin::new(&mut Box::pin(self.worker.borrow_mut().js_runtime.resolve_value(self.global.clone()))).poll(cx) {
        Poll::Ready(result) => Poll::Ready((self.id.clone(), result)),
        Poll::Pending => {
            cx.waker().clone().wake_by_ref();
            Poll::Pending
        }
    }
}

So the above

self.futures.push(Box::pin(async {
    let resolved = self.worker.js_runtime.resolve_value(global).await;
    return (id, resolved);
}));

would become

self.futures.push(Box::pin(BorrowingFuture{worker: self.worker, global: global.clone(), id: id.clone()}));

and this would have to be done for the response_futures above as well.

But I see a few issues with this.

  1. Creating a new future on every poll and then polling that seems wrong, but it does work.
    It probably has a performance impact because new objects are created constantly.
  2. The same issue would happen for the response futures, which would call send on each poll, which seems completely wrong.
  3. The waker.wake_by_ref is called on every poll, because there is no way to know when a script result will resolve. This results in the future being polled thousands (and more) times per second (always creating a new object), which could be the same as checking it in a loop, I guess.

Note My current setup doesn't use select!, but an enum as Output from multiple Future implementations, pushed into a single FuturesUnordered, and then matched to handle the correct type (script, send, receive). I used select here because it's far less verbose, and gets the point across.

Is there a way to do this better/more efficiently? Or is it just not the way MainWorker was meant to be used?

main for completeness:

#[tokio::main]
async fn main() {

    let main_module = deno_runtime::deno_core::resolve_url(MAIN_MODULE_SPECIFIER).unwrap();
    let user_module = deno_runtime::deno_core::resolve_url(USER_MODULE_SPECIFIER).unwrap();

    let (tx, mut rx) = channel(1);
    let (result_tx, mut result_rx) = channel(1);

    let handle = thread::spawn(move || {

        let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();

        let mut runner = Runner::new();
        runtime.block_on(runner.run_worker(&mut rx, main_module, user_module));
    });

    tx.send(("test input".to_string(), result_tx)).await.unwrap();
    let result = result_rx.recv().await.unwrap();
    println!("result from worker {}", result);

    handle.join().unwrap();
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.