Influxdb2 Binance ticks stream writing

Title: Error in Rust code while parsing JSON data and storing it in InfluxDB2

Description: I have written a Rust code that is supposed to receive a stream of data and store it in an InfluxDB2 database. However, I am encountering an error in the code. The error occurs in the line let price: f64 = f64::from_str(pr).unwrap();.

use websocket::client::ClientBuilder;
use websocket::OwnedMessage;
use influxdb2::models::FieldValue;

use std::time::Instant;
use std::str::FromStr;

use futures::prelude::*;
use influxdb2::models::DataPoint;
use influxdb2::Client;


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = ClientBuilder::new("wss://fstream.binance.com/stream")
        .unwrap()
        .connect(None)
        .expect("Failed to connect");
    
    let host = "http://localhost:8086";
    let org = "wannafly";
    let token = "QPjlMBmeh-fANqZpkC_sScPbW8weLxsw9yl8Q8s7PsUiATo43_plZSathKcRHJ6NOBx9UB3Uh05RixzG4ZT7yA==";
    let bucket = "Ticks";
        
    let clientdb = Client::new(host, org, token);

    let mut prev_time = Instant::now();

    
    let done = false;

    client
        .send_message(&OwnedMessage::Text(
            r#"{"method": "SUBSCRIBE", "params": ["!ticker@arr"], "id": 1}"#.to_string(),
        ))
        .expect("Failed to send message");

    for message in client.incoming_messages() {
        let message = message.expect("Failed to receive message");
        match message {
            OwnedMessage::Text(text) => {
                let tick: serde_json::Value =
                    serde_json::from_str(&text).expect("Failed to parse JSON");
                if let Some(data) = tick.get("data") {
                    if let Some(tickers) = data.as_array() {
                        println!("------------------------");
                        let mut n = 0;
                        for ticker in tickers {
                            n = n + 1;
                            let symbol: String;
                            let mut price: f64 = 0.0;
                            let pr: &str;
                            if let Some(s) = ticker.get("s") {
                                symbol = s.to_string();
                            } else {
                                symbol = "none".to_string();
                            }
                            if let Some(p) = ticker.get("c") {
                                let pr = &p.to_string();
                                //println!("{}", pr);
                                let price: f64 = f64::from_str(pr).unwrap();
                            } else {
                                continue;
                            }
                            println!("symbol: {}:{:?}", symbol, price);
                            let points = vec![
                                DataPoint::builder("Binance")
                                    .tag("Symbol", symbol)
                                    .field("Price", Into::<FieldValue>::into(price))
                                    .build()?,
                            ];
    
                            clientdb.write(&bucket, stream::iter(points)).await?;
                            if done {    
                                break;    
                            }
                        }
                        let current_time = Instant::now();
                        let time_diff = current_time.duration_since(prev_time);
                        prev_time = current_time;
                        println!("count: {}", n);
                        println!("Time interval: {:?}", time_diff);

                    }
                }
            }
            OwnedMessage::Close(_) => {
                println!("Connection closed");
                break;
            }
            _ => {}
        }
    }
    Ok(())
}

Error:

[svelle@lenovo trade]$ cargo run
   Compiling trade v0.1.0 (/home/svelle/Рабочий стол/Rust/trade)
warning: unused variable: `pr`
  --> src/main.rs:53:33
   |
53 | ...                   let pr: &str;
   |                           ^^ help: if this is intentional, prefix it with an underscore: `_pr`
   |
   = note: `#[warn(unused_variables)]` on by default

warning: unused variable: `price`
  --> src/main.rs:62:37
   |
62 | ...                   let price: f64 = f64::from_str(pr).unwrap();
   |                           ^^^^^ help: if this is intentional, prefix it with an underscore: `_price`

warning: variable does not need to be mutable
  --> src/main.rs:52:33
   |
52 | ...                   let mut price: f64 = 0.0;
   |                           ----^^^^^
   |                           |
   |                           help: remove this `mut`
   |
   = note: `#[warn(unused_mut)]` on by default

warning: `trade` (bin "trade") generated 3 warnings (run `cargo fix --bin "trade"` to apply 3 suggestions)
    Finished dev [unoptimized + debuginfo] target(s) in 59.54s
warning: the following packages contain code that will be rejected by a future version of Rust: traitobject v0.1.0
note: to see what the problems were, use the option `--future-incompat-report`, or run `cargo report future-incompatibilities --id 1`
     Running `target/debug/trade`
------------------------
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParseFloatError { kind: Invalid }', src/main.rs:62:68
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
[svelle@lenovo trade]$ 

In general, it's a bad idea to have unwrap() in your code. You should replace

let price: f64 = f64::from_str(pr).unwrap();

with a match statement:

match f64::from_str(pr) {
    Ok(price) => {
        // Successfully unwrapped the float response
    }
    Err(err) => {
        // log the error message
        // API returned a non-float response
    }
}

Thank you! Code runs without any errors now, but I'm facing an issue where the price variable remains 0, even though the first print statement shows that there is a value present. Here is the console output:
"9.059"
symbol: "NEOUSDT":0.0
"0.1151"
symbol: "ALGOUSDT":0.0
"0.072370"
symbol: "DOGEUSDT":0.0
"0.9031"
symbol: "KAVAUSDT":0.0
"2.897"
symbol: "SNXUSDT":0.0
"0.756"
symbol: "CRVUSDT":0.0

In the code, I have two print statements:
println!("{}", pr);
println!("symbol: {}:{:?}", symbol, price);

The first print statement correctly displays the value, but somehow it gets lost along the way, resulting in price being 0.0.

You are re-declaring price instead of assigning to it. (That's why you shouldn't ignore warnings.)

I no longer re-declare it

use websocket::client::ClientBuilder;
use websocket::OwnedMessage;
use influxdb2::models::FieldValue;

use std::time::Instant;
use std::str::FromStr;

use futures::prelude::*;
use influxdb2::models::DataPoint;
use influxdb2::Client;


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = ClientBuilder::new("wss://fstream.binance.com/stream")
        .unwrap()
        .connect(None)
        .expect("Failed to connect");
    
    let host = "http://localhost:8086";
    let org = "wannafly";
    let token = "QPjlMBmeh-fANqZpkC_sScPbW8weLxsw9yl8Q8s7PsUiATo43_plZSathKcRHJ6NOBx9UB3Uh05RixzG4ZT7yA==";
    let bucket = "Ticks";
        
    let clientdb = Client::new(host, org, token);

    let mut prev_time = Instant::now();

    
    let done = false;

    client
        .send_message(&OwnedMessage::Text(
            r#"{"method": "SUBSCRIBE", "params": ["!ticker@arr"], "id": 1}"#.to_string(),
        ))
        .expect("Failed to send message");

    for message in client.incoming_messages() {
        let message = message.expect("Failed to receive message");
        match message {
            OwnedMessage::Text(text) => {
                let tick: serde_json::Value =
                    serde_json::from_str(&text).expect("Failed to parse JSON");
                if let Some(data) = tick.get("data") {
                    if let Some(tickers) = data.as_array() {
                        println!("------------------------");
                        let mut n = 0;
                        for ticker in tickers {
                            n = n + 1;
                            let symbol: String;
                            let price: String;
                            if let Some(s) = ticker.get("s") {
                                symbol = s.to_string();
                            } else {
                                symbol = "none".to_string();
                            }
                            if let Some(p) = ticker.get("c") {
                                price = p.to_string();
                                println!("{}", price);
                                match f64::from_str(&price) {
                                    Ok(price) => {
                                        println!("symbol: {}:{:?}", symbol, price);
                                        // Successfully unwrapped the float response
                                    }
                                    Err(err) => {
                                        println!("Error");
                                        // log the error message
                                        // API returned a non-float response
                                    }
                                } 
                            } else {
                                continue;
                            }
                            //println!("symbol: {}:{:?}", symbol, price);
                            let points = vec![
                                DataPoint::builder("Binance")
                                    .tag("Symbol", symbol)
                                    .field("Price", Into::<FieldValue>::into(price))
                                    .build()?,
                            ];
    
                            clientdb.write(&bucket, stream::iter(points)).await?;
                            if done {    
                                break;    
                            }
                        }
                        let current_time = Instant::now();
                        let time_diff = current_time.duration_since(prev_time);
                        prev_time = current_time;
                        println!("count: {}", n);
                        println!("Time interval: {:?}", time_diff);

                    }
                }
            }
            OwnedMessage::Close(_) => {
                println!("Connection closed");
                break;
            }
            _ => {}
        }
    }
    Ok(())
}

console:

"29900.40"
Error
"1873.26"
Error
"250.32"
Error
"0.7395"
Error
"0.778"
Error
"0.08313"
Error
"0.31440"
Error
"1.2522"
Error
"2.892"
Error

There are quotation marks, i.e. "", around your float, which is causing the parsing to fail.

You can replace

match f64::from_str(&price) {

with

match f64::from_str(&price.replace('"', "")) {

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.