Work with postgresql in different threads

I have file structure:
file_

Cargo.toml
[package]
name = "hsql"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
chrono = "0.4.24"
serde = { version = "1.0.159", features = ["derive"] }
serde_json = "1.0"
tokio-postgres = { version="0.7.8", features=["with-chrono-0_4"] }
main.rs
mod ths;
use tokio_postgres::{NoTls};
use chrono::prelude::*;

#[derive(Debug, Default)]
pub struct Tst {
    id: i32,
    name: String,  
    start: chrono::DateTime<chrono::Local>,  // DateTime<Local>,
	stop:  chrono::DateTime<chrono::Local>,  // DateTime<Local>,
	dtl: i32,
	hst: String, 
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    //  postgresql:
   let (client, connection) = tokio_postgres::connect("host=localhost user=*** port=5432 password=*** dbname=***", NoTls).await?; 
   tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
       }
    });
   // - - - - - - - - - - - - - - - - -
      let handle01 = ths::t01::f(&tst);
   // let handle02 = ths::t02::f(&tst);
   //     ...           ::t03
   // - - - - - - - - - - - - - - - - -   
   let mut tst = Tst::default();
   for row in &client.query("select ... from ... where ... order by ... asc", &[]).await? {
       tst.id = row.get(0);
       tst.name = row.get(1);
       tst.start = row.get(2);
       tst.stop = row.get(3);
       tst.dtl = row.get(4);
       tst.hst = String::from("example.com");
    }
    // ths::t01::f(&tst);
    handle01.join().unwrap();
    //
    println!("-\n1/. tst.id = {:?}", tst.id);
    println!("2/. tst.name = {:?}", tst.name);
    println!("3/. tst.start = {:?}", tst.start);
    println!("4/. tst.stop =  {:?}", tst.stop);
    println!("5/. tst.dtl = {:?}", tst.dtl);
    println!("6/. tst.hst = {:?}\n-", tst.hst);
    //
   let t_curr: chrono::DateTime<Local> = Local::now();
   println!("t_curr: {:?} - {:?}\n-", t_curr, t_curr.timestamp());
    //
Ok(())
}
t01.rs
   use crate::Tst;
// for `pub fn f(tst: &Tst) -> JoinHandle<()> {` :
   use std::thread;
   use std::thread::JoinHandle;

// pub async fn f(tst: &Tst) -> Result<(), Box<dyn std::error::Error>> {
// pub async fn f(tst: &Tst) -> JoinHandle<()> {
   pub fn f(tst: &Tst) -> JoinHandle<()> {
// pub fn f(tst: &Tst) {   
      /*  postgresql: 
      let (client, connection) = tokio_postgres::connect("host=localhost user=*** port=5432 password=*** dbname=***", NoTls).await?; 
      tokio::spawn(async move {
          if let Err(e) = connection.await {
              eprintln!("connection error: {}", e);
          }
      });
      */
      // -
      thread::spawn(move || {
       println!("-\n1. tst.id = {:?}", tst.id);     // 165 
       println!("2. tst.name = {:?}", tst.name);    // "text" 
       println!("3. tst.start = {:?}", tst.start);  // 2023-07-05T06:54:00+03:00
       println!("4. tst.stop =  {:?}", tst.stop);   // 2023-07-05T08:16:00+03:00         
       println!("5. tst.dtl = {:?}", tst.dtl);      // 1 
       println!("6. tst.hst = {:?}\n-", tst.hst);   // "example.com" 
      })// `;`
      //
    // Ok(())
   }

I want to work with postgres in the main thread and in separate ones.
Does not work. How to fix my code? Thanks.

update _01.

Sending data as function arguments from main thread to t01 thread:

main.rs
mod ths;
use tokio_postgres::{NoTls, Error};
use chrono::prelude::*;

#[derive(Debug, Default)]
pub struct Tst {
    pub id: i32,
    pub name: String,  
    pub start: chrono::DateTime<chrono::Local>,  // DateTime<Local>,
	pub stop: chrono::DateTime<chrono::Local>,   // DateTime<Local>,
	pub dtl: i32,
	pub hst: String, 
}

#[tokio::main]
async fn main() -> Result<(), Error> {   
    //  postgresql:
   let (client, connection) = tokio_postgres::connect("host=localhost user=*** port=5432 password=*** dbname=***", NoTls).await?; 
   tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
       }
    });
    //
   let mut tst = Tst::default();
    // - - - - - - - - - - - - - - - - -
       let handle01 = ths::t01::f(165, "text".to_string(), DateTime::parse_from_rfc3339("2023-07-05T06:54:00+03:00").unwrap().into(), DateTime::parse_from_rfc3339("2023-07-05T08:16:00+03:00").unwrap().into(), 1, "example.com".to_string());
    // let handle02 = ths::t02::f(&tst);
    //     ...           ::t03
    // - - - - - - - - - - - - - - - - -
   for row in &client.query("select ... from ... where ... order by ... asc", &[]).await? {
       tst.id = row.get(0);
       tst.name = row.get(1);
       tst.start = row.get(2);
       tst.stop = row.get(3);
       tst.dtl = row.get(4);
       tst.hst = String::from("example.com");
    }
     //
    println!("-\n1/. tst.id = {:?}", tst.id);
    println!("2/. tst.name = {:?}", tst.name);
    println!("3/. tst.start = {:?}", tst.start);
    println!("4/. tst.stop =  {:?}", tst.stop);
    println!("5/. tst.dtl = {:?}", tst.dtl);
    println!("6/. tst.hst = {:?}", tst.hst);
    //
    // ths::t01::f(&tst);
    // handle01.join().unwrap();
    handle01.await.join().unwrap();
    //
    let t_curr: chrono::DateTime<Local> = Local::now();
    println!("t_curr: {:?} - {:?}\n-", t_curr, t_curr.timestamp());
    //
Ok(())
}
t01.rs
   use tokio_postgres::{NoTls, Error};
   use chrono::prelude::*;
   use std::thread;
   use std::thread::JoinHandle;

   pub async fn f(t1: u8, t2: String, t3: DateTime<Local>, t4: DateTime<Local>, t5: u8, t6: String) -> JoinHandle<()> {   
      //  postgresql: 
      /*
      let (client, connection) = tokio_postgres::connect("host=localhost user=*** port=5432 password=*** dbname=***", NoTls).await?;
      tokio::spawn(async move {
          if let Err(e) = connection.await {
              eprintln!("connection error: {}", e);
          }
      });
      */
      // -
      thread::spawn(move || {
       println!("-\n1. tst.id = {:?}", t1);           // 165 
       println!("2. tst.name = {:?}", t2);            // "text" 
       println!("3. tst.start = {:?}", t3);           // 2023-07-05T06:54:00+03:00
       println!("4. tst.stop =  {:?}", t4);           // 2023-07-05T08:16:00+03:00         
       println!("5. tst.dtl = {:?}", t5);             // 1 
       println!("6. tst.hst = {:?}\n-", t6);          // "example.com" 
      })// `;`
      //
    // Ok(())
   }
> cargo run
-
1/. tst.id       = 165
2/. tst.name     = "text"
3/. tst.start    = 2023-07-05T08:04:00+03:00
4/. tst.stop     = 2023-07-05T08:16:00+03:00
5/. tst.dtl      = 1
6/. tst.hst      = "example.com"
-
1. tst.id        = 165
2. tst.name      = "text"
3. tst.start     = 2023-07-05T06:54:00+03:00
4. tst.stop      = 2023-07-05T08:16:00+03:00
5. tst.dtl       = 1
6. tst.hst       = "example.com"
-
t_curr: 2023-07-08T12:39:55.739395200+03:00 - 1688809195
-

How to work with postgres (create connection) from thread t01?

Please elaborate on "does not work". It's impossible to tell what you want the code to do, what it does, and how it's not doing what you want.

1 Like

The code works if pass data to the function

// from main.rs  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
ths::t01::f(&tst);

// t01.ts  - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
pub fn f(tst: &Tst) {
       println!("-\n1. tst.id = {:?}", tst.id);     // 165 
       println!("2. tst.name = {:?}", tst.name);    // "text" 
       println!("3. tst.start = {:?}", tst.start);  // 2023-07-05T06:54:00+03:00
       println!("4. tst.stop =  {:?}", tst.stop);   // 2023-07-05T08:16:00+03:00         
       println!("5. tst.dtl = {:?}", tst.dtl);      // 1 
       println!("6. tst.hst = {:?}\n-", tst.hst);   // "example.com"
}

Works with postgres from main.rs. I receive data and send to t01.rs.
I also want to work with postgres from t01.rs. Separate stream.

I'm trying to send data from main.rs when t01.rs in a separate thread:

// from main.rs   - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
let handle01 = ths::t01::f(&tst);
//   ...
//   ths::t01::f(&tst);
//   ...
handle01.join().unwrap();

// t01.rs  - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
   use std::thread;
   use std::thread::JoinHandle;

// pub fn f(tst: &Tst) -> JoinHandle<()> {
   pub fn f<'a>(tst: &'a Tst) -> JoinHandle<()> {

   thread::spawn(move || {
       println!("-\n1. tst.id = {:?}", tst.id);     // 165 
       println!("2. tst.name = {:?}", tst.name);    // "text" 
       println!("3. tst.start = {:?}", tst.start);  // 2023-07-05T06:54:00+03:00
       println!("4. tst.stop =  {:?}", tst.stop);   // 2023-07-05T08:16:00+03:00         
       println!("5. tst.dtl = {:?}", tst.dtl);      // 1 
       println!("6. tst.hst = {:?}\n-", tst.hst);   // "example.com" 
    })

To get work done in a separate thread. While without a connection to postgres from t01.rs.

Get error:
error[E0521]: borrowed data escapes outside of function
  --> src\ths\t01.rs:20:7
   |
9  |      pub fn f<'a>(tst: &'a Tst) -> JoinHandle<()> {
   |               --  --- `tst` is a reference that is only valid in the function body
   |               |
   |               lifetime `'a` defined here
...
20 | /       thread::spawn(move || {
21 | |        println!("-\n1. tst.id = {:?}", tst.id);     // 165
22 | |        println!("2. tst.name = {:?}", tst.name);    // "text"
23 | |        println!("3. tst.start = {:?}", tst.start);  // 2023-07-05T06:54:00+03:00
...  |
26 | |        println!("6. tst.hst = {:?}\n-", tst.hst);   // "example.com"
27 | |       })// `;`
   | |        ^
   | |        |
   | |________`tst` escapes the function body here
   |          argument requires that `'a` must outlive `'static`

For more information about this error, try `rustc --explain E0521`.
error: could not compile `hsql` due to previous error

This has absolutely nothing to do with postgres, and everything to do with the fact that you are moving a non-static reference into a non-scoped thread.

How to correctly moving reference into new thread in my code? Thanks.

With exception for scoped threads API, you can't pass any T that isn't T: 'static into another thread (or async task for that matter).

I added info, look please.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.