async fn process(url: Url) -> Result<()> {
let bytes = fetch(url.clone()).await?;
let page = String::from_utf8(bytes.clone())?;
let page = kuchiki::parse_html().one(page);
//let links = get_links(&page, url);
drop(page);
let links: Vec<Url> = vec![];
for url in &links {
add_url(&url).await.unwrap();
}
Ok(())
}
This gives the following error. What is confusing about this error is that I explicitly droppage before the add_url line, so I'm not sure what the issue is.
error: future cannot be sent between threads safely
--> src/main.rs:321:22
|
321 | let handle = tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /home/vedantroy/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/task/spawn.rs:129:21
|
129 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `Rc<Node>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:264:9
|
256 | let page = kuchiki::parse_html().one(page);
| ---- has type `NodeRef` which is not `Send`
...
264 | add_url(&url).await.unwrap();
| ^^^^^^^^^^^^^^^^^^^ await occurs here, with `page` maybe used later
...
267 | }
| - `page` is later dropped here
Ok, I fixed the error by doing this. But I don't see why this fixes the error since I explicitly did a drop of page in the original example. Here the block is doing an implicit drop of page, but I don't see the fundamental difference. Is this a bug in the Rust compiler?
// all non-fatal errors bubble up to this function
async fn process(url: Url) -> Result<()> {
let links = {
let bytes = fetch(url.clone()).await?;
// TODO: Is there a way to do this w/o clone?
let page = String::from_utf8(bytes.clone())?;
let page = kuchiki::parse_html().one(page);
let input = get_training_input(&page).ok_or(anyhow!("No training input for: {:?}", url))?;
let output = get_training_output(&page, &url);
let json = json!({
"raw": bytes,
"input": input,
"labels": output,
});
//Add JSON to the saver
SAVER.add(json);
get_links(&page, url)
};
for url in &links {
add_url(url).await.unwrap();
}
Ok(())
}
@alice, do you know if there is an active effort to update the compiler to support dropping locks before .awaiting, allowing the code to compile? In cases where a lock is being obtained in an outermost branch, then, 4-5 nests in, the code needs to .await, the proposed mechanism above leads to poorly-legable code. It makes more sense just to add one-line of unsafe code over a future wrapper than it does re-writing a bunch of code
It has been this way for a while. I assume it will be fixed eventually, but I don't know when.
One pattern you may find useful is to encapsulate your non-Send type in a non-async function, and just call that method from async code. Something like this:
async fn process(url: Url) -> Result<()> {
let links = page_to_links(url.clone(), fetch(url).await?)?;
for url in &links {
add_url(url).await.unwrap();
}
Ok(())
}
fn page_to_links(url: Url, bytes: Vec<u8>) -> Result<Vec<Link>> {
let page = String::from_utf8(bytes)?;
let page = kuchiki::parse_html().one(page);
let input = get_training_input(&page).ok_or(anyhow!("No training input for: {:?}", url))?;
let output = get_training_output(&page, &url);
let json = json!({
"raw": bytes,
"input": input,
"labels": output,
});
SAVER.add(json);
Ok(get_links(&page, url))
}
I strongly recommend not using unsafe code to circumvent this.