Borrow of moved value using Arc and Mutex

I just started with Rust and I am facing some issues with Borrow/Ownership things.

I need to edit markets_data every X seconds so I am doing this
let markets_data = Arc::new(Mutex::new(HashMap::new()));

and inside tokio::spawn I have a loop and inside this loop I call the function that should edit the markets_data every X seconds using tokio::interval

edit_markets_data(ids.clone(), Arc::clone(&markets_data)).await

The issue is that I need to use the markets_data updated version each time an event trigger. So inside my infinite loop where I get the event stream I tried this

let market_infos = Arc::clone(&markets_data);

but I am gettingborrow of moved value:markets_data error

Please show a reproducible example.

The compiler should tell you where the value was moved. Do you have a minimal example?

This is a reproducible code:

use ethers::prelude::*;
use eyre::Result;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use reqwest;
use tokio::time::{sleep, Duration};

#[derive(Debug, serde::Deserialize)]
struct PriceFeedResponse {
    pub vaa: String,
}

#[derive(Debug, Clone)]
struct MarketInfo {
    price_feed: Bytes,
    pyth_feed: String,
}


async fn get_price_feeds_update_data(ids: &str, market_infos: Arc<Mutex<HashMap<String, MarketInfo>>>) -> Result<Vec<Bytes>, Box<dyn std::error::Error>> {
    let url = format!("https://xc-mainnet.pyth.network/api/latest_price_feeds?{}&binary=true", ids);

    let res = reqwest::get(&url)
        .await?
        .json::<Vec<PriceFeedResponse>>()
        .await?;

    let mut price_feeds: Vec<Bytes> = Vec::new();
    //read the vaa for each element from the response and convert it to hex 
    // priceFeeds.push("0x" + Buffer.from(el.vaa, "base64").toString("hex"));
    for el in res {
        let vaa_bytes = base64::decode(el.vaa)?;

        let price_feed = Bytes::from(vaa_bytes);
        price_feeds.push(price_feed);
    }

    // update price_feeds in market_infos
    let mut market_infos = market_infos.lock().unwrap();
    for (i, market_info) in market_infos.iter_mut().enumerate() {
        market_info.1.price_feed = price_feeds[i].clone();
    }

    println!("market_infos: {:?}", market_infos);

    Ok(price_feeds)
}

// create a function to populate the market_infos HashMap
async fn populate_market_infos(market_infos: Arc<Mutex<HashMap<String, MarketInfo>>>) -> Result<(), Box<dyn std::error::Error>> {

    let mut market_infos = market_infos.lock().unwrap();

    // populate market_infos with 10 random market infos

    // let mut market_infos: HashMap<String, MarketInfo> = HashMap::new();
    let mut market_info: MarketInfo;
    let mut price_feed: Bytes;
    let mut pyth_feed: String;

    for i in 0..4 {
        price_feed = Bytes::from(vec![i; 32]);
        pyth_feed = format!("pyth_feed_{}", i);
        market_info = MarketInfo {
            price_feed,
            pyth_feed,
        };
        market_infos.insert(format!("market_{}", i), market_info);
    }

    Ok(())
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let markets_data: Arc<Mutex<HashMap<String, MarketInfo>>> = Arc::new(Mutex::new(HashMap::new()));

    populate_market_infos(Arc::clone(&markets_data)).await?;

    let ids: &str = "ids[]=0x2b9ab1e972a281585084148ba1389800799bd4be63b957507db1349314e47445&ids[]=0x2a01deaec9e51a579277b34b122399984d0bbf57e2458a7e42fecd2829867a0d&ids[]=0x2f95862b045670cd22bee3114c39763a4a08beeb663b145d283c31d7d1101c4f&ids[]=0x5de33a9112c2b700b8d30b8a3402c103578ccfa2765696471cc672bd5cf6ac52";

    tokio::task::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(9));
        
        loop {
            interval.tick().await;
            if let Err(err) = get_price_feeds_update_data(ids.clone(), Arc::clone(&markets_data)).await {
                eprintln!("Error: {}", err);
            }
        }
    });


    // these is where the error triggers.
    let market_infos = markets_data.clone();
    let market_infos = market_infos.lock().unwrap();
    
    // added only to see if elements get updated
    loop {
        sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

Clone your Arc before the spawn so that you move the clone into the async block and not the original.

    let md = markets_data.clone();
//  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    tokio::task::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(9));
        
        loop {
            interval.tick().await;
            if let Err(err) = get_price_feeds_update_data(ids.clone(), md.clone()).await {
//                                                                     ^^^^^^^^^^
                eprintln!("Error: {}", err);
            }
        }
    });
2 Likes

Ok thank you. Instead this must remain markets_data.clone() ?

let market_infos = markets_data.clone();

In this way market_infos will represente always the updated data correct?

You can just

    let market_infos = markets_data.lock().unwrap();

I didn't really do a review of everything, I just fixed the ownership problem.

The Arc<_> enables shared ownership, so everything is sharing the same HashMap. I think that is what you mean.

You don't want to obtain the mutex lock and then go into a loop like your example shows -- everything else trying to get the lock will block forever once you're in the loop. Only hold the lock for as long as needed.

Thanks. Yes the loop was just for testing purpose

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.