I'm trying to run multiple invocation of the same script on a single deno MainWorker
concurrently, and waiting for their results (since the scripts can be async). Conceptually, I want something like the loop in run_worker
below.
type Tx = Sender<(String, Sender<String>)>;
type Rx = Receiver<(String, Sender<String>)>;
struct Runner {
worker: MainWorker,
futures: FuturesUnordered<Pin<Box<dyn Future<Output=(String, Result<Global<Value>, Error>)>>>>,
response_futures: FuturesUnordered<Pin<Box<dyn Future<Output=(String, Result<(), SendError<String>>)>>>>,
result_senders: HashMap<String, Sender<String>>,
}
impl Runner {
fn new() ...
async fn run_worker(&mut self, rx: &mut Rx, main_module: ModuleSpecifier, user_module: ModuleSpecifier) {
self.worker.execute_main_module(&main_module).await.unwrap();
self.worker.preload_side_module(&user_module).await.unwrap();
loop {
tokio::select! {
msg = rx.recv() => {
if let Some((id, sender)) = msg {
let global = self.worker.js_runtime.execute_script("test", "mod.entry()").unwrap();
self.result_senders.insert(id, sender);
self.futures.push(Box::pin(async {
let resolved = self.worker.js_runtime.resolve_value(global).await;
return (id, resolved);
}));
}
},
script_result = self.futures.next() => {
if let Some((id, out)) = script_result {
self.response_futures.push(Box::pin(async {
let value = deserialize_value(out.unwrap(), &mut self.worker);
let res = self.result_senders.remove(&id).unwrap().send(value).await;
return (id.clone(), res);
}));
}
},
// also handle response_futures here
else => break,
}
}
}
}
The worker can't be borrowed as mutable multiple times, so this won't work. So the worker has to be a RefCell
, and I've created a BorrowingFuture
:
struct BorrowingFuture {
worker: RefCell<MainWorker>,
global: Global<Value>,
id: String
}
And its poll
implementation:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut Box::pin(self.worker.borrow_mut().js_runtime.resolve_value(self.global.clone()))).poll(cx) {
Poll::Ready(result) => Poll::Ready((self.id.clone(), result)),
Poll::Pending => {
cx.waker().clone().wake_by_ref();
Poll::Pending
}
}
}
So the above
self.futures.push(Box::pin(async {
let resolved = self.worker.js_runtime.resolve_value(global).await;
return (id, resolved);
}));
would become
self.futures.push(Box::pin(BorrowingFuture{worker: self.worker, global: global.clone(), id: id.clone()}));
and this would have to be done for the response_futures
above as well.
But I see a few issues with this.
- Creating a new future on every
poll
and then polling that seems wrong, but it does work.
It probably has a performance impact because new objects are created constantly. - The same issue would happen for the response futures, which would call
send
on eachpoll
, which seems completely wrong. - The
waker.wake_by_ref
is called on every poll, because there is no way to know when a script result will resolve. This results in the future being polled thousands (and more) times per second (always creating a new object), which could be the same as checking it in a loop, I guess.
Note My current setup doesn't use select!
, but an enum
as Output
from multiple Future
implementations, pushed into a single FuturesUnordered
, and then matched to handle the correct type (script, send, receive). I used select
here because it's far less verbose, and gets the point across.
Is there a way to do this better/more efficiently? Or is it just not the way MainWorker
was meant to be used?
main
for completeness:
#[tokio::main]
async fn main() {
let main_module = deno_runtime::deno_core::resolve_url(MAIN_MODULE_SPECIFIER).unwrap();
let user_module = deno_runtime::deno_core::resolve_url(USER_MODULE_SPECIFIER).unwrap();
let (tx, mut rx) = channel(1);
let (result_tx, mut result_rx) = channel(1);
let handle = thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let mut runner = Runner::new();
runtime.block_on(runner.run_worker(&mut rx, main_module, user_module));
});
tx.send(("test input".to_string(), result_tx)).await.unwrap();
let result = result_rx.recv().await.unwrap();
println!("result from worker {}", result);
handle.join().unwrap();
}