Doesn't live long enough error


#1

In the following code, the compiler is giving me a few errors and i don’t know what they mean. I am following the example here: https://docs.rs/postgres-binary-copy/0.4.0/postgres_binary_copy/

‘rec’ does not live long enough. I guess i really don’t know what i am doing regarding the ownership. I have read the doc but it hasn’t sunk in yet. How do i make the rec value live longer? Is that what the “'a” suff i read in the docs do? If so how do i use that?

As always thank you for helping me learn. This experiment is my company’s introduction to Rust.

dan@dan-VirtualBox:~/Downloads/csvreaders/rustcsvreader$ cargo build --release
   Compiling rustcsvreader v0.1.0 (file:///home/dan/Downloads/csvreaders/rustcsvreader)
error: `rec` does not live long enough
  --> src/main.rs:53:32
   |
53 | 				let temp = str::from_utf8(&rec[idx]).unwrap();
   | 				                           ^^^ does not live long enough
...
76 | }
   | -	borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0382]: use of moved value: `data`
  --> src/main.rs:56:7
   |
56 | 						data.push(Box::new(None::<i32>));
   | 						^^^^ value used here after move
...
66 | 			let data = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
   | 						                                    ---- value moved here
   |
   = note: move occurs because `data` has type `std::vec::Vec<std::boxed::Box<postgres::types::ToSql + 'static>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `data`
  --> src/main.rs:58:7
   |
58 | 						data.push(Box::new(Some::<i32>(temp.parse().unwrap())));
   | 						^^^^ value used here after move
...
66 | 			let data = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
   | 						                                    ---- value moved here
   |
   = note: move occurs because `data` has type `std::vec::Vec<std::boxed::Box<postgres::types::ToSql + 'static>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `data`
  --> src/main.rs:61:6
   |
61 | 					data.push(Box::new(temp.clone()));
   | 					^^^^ value used here after move
...
66 | 			let data = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
   | 					                                     ---- value moved here
   |
   = note: move occurs because `data` has type `std::vec::Vec<std::boxed::Box<postgres::types::ToSql + 'static>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `data`
  --> src/main.rs:66:43
   |
66 | 			let data = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
   | 			                                       ^^^^ value moved here in previous iteration of loop
   |
   = note: move occurs because `data` has type `std::vec::Vec<std::boxed::Box<postgres::types::ToSql + 'static>>`, which does not implement the `Copy` trait

error: aborting due to 5 previous errors

Full code:

extern crate csv;
extern crate postgres;
extern crate postgres_binary_copy;
extern crate chrono;
extern crate streaming_iterator;
use std::env;
use chrono::prelude::*;
use std::str;
use postgres::{Connection, TlsMode};
use postgres::types::{Type, ToSql};
use postgres_binary_copy::BinaryCopyReader;
use streaming_iterator::StreamingIterator;

fn main() {
	let bulk_insert = env::args().count() > 1 && env::args().nth(1).unwrap() == "bulk" ;
	println!("bulk insert is {} ", bulk_insert);

	let insert_sql = "INSERT INTO ontime_performance VALUES (
			$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
			$12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22,
			$23, $24, $25, $26, $27, $28, $29
			);";
	let bulk_insert_sql = "COPY ontime_performance  FROM STDIN (FORMAT binary)";
	//let path = Path::new("/home/dan/Downloads/2008.csv");
	//http://stat-computing.org/dataexpo/2009/the-data.html
	//unzipped ~700MB and 7million records.  This is 5.5 min on my VM (15 in python)
	let mut rowcnt = 0u64;
	let mut rdr = csv::Reader::from_reader(std::io::stdin());
	//let mut rdr = csv::Reader::from_path("/home/dan/Downloads/2008.csv").unwrap();
	let mut rec = csv::ByteRecord::new();
	let mut ints : Vec<Option<i32>> = vec![Some(0); rdr.byte_headers().unwrap().len()];
	let conn = Connection::connect("postgres://dan:dan@localhost/rowcounttest", TlsMode::None)
		.expect("Connection failed");
	let tran = conn.transaction().unwrap();
	let stmt = if !bulk_insert {
		 tran.prepare(insert_sql).expect("Prepare failed")
	}
	else {
		conn.prepare(bulk_insert_sql).expect("Prepare failed.")
	};
	let table_types = &[Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4,
		Type::Varchar, Type::Int4, Type::Varchar, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4,
		Type::Varchar, Type::Varchar, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Varchar,
		Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4
		];
	let mut data: Vec<Box<ToSql>> = std::vec::Vec::new();
	let mut utc: DateTime<Utc> = Utc::now();
	while rdr.read_byte_record(&mut rec).expect("all done") {
		if !bulk_insert {
			rowcnt += insert_to_pg(&rec, &mut ints, &stmt);
		} else {
			for idx in 0..29 {
				let temp = str::from_utf8(&rec[idx]).unwrap();
				if idx != 8 && idx != 10 && idx != 16  && idx != 17 && idx != 22 {
					if temp == "NA" { 
						data.push(Box::new(None::<i32>));
					} else { 
						data.push(Box::new(Some::<i32>(temp.parse().unwrap())));
					}
				} else {
					data.push(Box::new(temp));
				}
			}
		}
		if rowcnt % 100000 == 0 {
			let data = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
    		let mut binreader = BinaryCopyReader::new(table_types, data);
    		stmt.copy_in(&[], &mut binreader).unwrap();
			println!("rowcount = {} in {}", rowcnt,  Utc::now().signed_duration_since( utc) );
			utc = Utc::now();
		} 
	}
	let _ = tran.commit();
	let _ = stmt.finish();
	println!("rowcount = {}", rowcnt);
}

#2

You’ll want to change

} else {
	data.push(Box::new(temp));
}

to

} else {
	data.push(Box::new(temp.to_string()));
}

As for the “use of moved value ‘data’” errors, you’ll probably want to use Vec::drain instead.


#3

Thank you. I couldn’t get drain to work but recreating the Vec each time did. The drain data.drain(..) still gave the ‘used’ error. The 5.5 minutes i saw yesterday is down to 1.

For the curious here the complete code:

extern crate csv;
extern crate postgres;
extern crate postgres_binary_copy;
extern crate chrono;
extern crate streaming_iterator;
use std::env;
use chrono::prelude::*;
use std::str;
use postgres::{Connection, TlsMode};
use postgres::types::{Type, ToSql};
use postgres_binary_copy::BinaryCopyReader;
use streaming_iterator::StreamingIterator;

fn main() {
	let bulk_insert = env::args().count() > 1 && env::args().nth(1).unwrap() == "bulk" ;
	println!("bulk insert is {} ", bulk_insert);

	let insert_sql = "INSERT INTO ontime_performance VALUES (
			$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
			$12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22,
			$23, $24, $25, $26, $27, $28, $29
			);";
	let bulk_insert_sql = "COPY ontime_performance  FROM STDIN (FORMAT binary)";
	//let path = Path::new("/home/dan/Downloads/2008.csv");
	//http://stat-computing.org/dataexpo/2009/the-data.html
	//unzipped ~700MB and 7million records.  This is 5.5 min on my VM (15 in python)
	let mut rowcnt = 0u64;
	let mut rdr = csv::Reader::from_reader(std::io::stdin());
	//let mut rdr = csv::Reader::from_path("/home/dan/Downloads/2008.csv").unwrap();
	let mut rec = csv::ByteRecord::new();
	let mut ints : Vec<Option<i32>> = vec![Some(0); rdr.byte_headers().unwrap().len()];
	let conn = Connection::connect("postgres://dan:dan@localhost/rowcounttest", TlsMode::None)
		.expect("Connection failed");
	let tran = conn.transaction().unwrap();
	let stmt = if !bulk_insert {
		 tran.prepare(insert_sql).expect("Prepare failed")
	}
	else {
		conn.prepare(bulk_insert_sql).expect("Prepare failed.")
	};
	let table_types = &[Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4,
		Type::Varchar, Type::Int4, Type::Varchar, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4,
		Type::Varchar, Type::Varchar, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Varchar,
		Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4, Type::Int4
		];
	let mut data: Vec<Box<ToSql>> = std::vec::Vec::new();
	let mut utc: DateTime<Utc> = Utc::now();
	while rdr.read_byte_record(&mut rec).expect("all done") {
		if !bulk_insert {
			rowcnt += insert_to_pg(&rec, &mut ints, &stmt);
		} else {
			for idx in 0..29 {
				let temp = str::from_utf8(&rec[idx]).unwrap();
				if idx != 8 && idx != 10 && idx != 16  && idx != 17 && idx != 22 {
					if temp == "NA" { 
						data.push(Box::new(None::<i32>));
					} else { 
						data.push(Box::new(Some::<i32>(temp.parse().unwrap())));
					}
				} else {
					data.push(Box::new(temp.to_string()));
				}
			}
			rowcnt += 1;
		}
		if rowcnt % 100000 == 0 {
			let data1 = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
    		let mut binreader = BinaryCopyReader::new(table_types, data1);
    		stmt.copy_in(&[], &mut binreader).unwrap();
			//data.drain(0..);
			data = std::vec::Vec::new();
			println!("rowcount = {} in {}", rowcnt,  Utc::now().signed_duration_since( utc) );
			utc = Utc::now();
		} 
	}
	let data1 = streaming_iterator::convert(data.into_iter()).map_ref(|v| &**v);
	let mut binreader = BinaryCopyReader::new(table_types, data1);
	stmt.copy_in(&[], &mut binreader).unwrap();
	let _ = tran.commit();
	let _ = stmt.finish();
	println!("rowcount = {}", rowcnt);
}
fn insert_to_pg(rec : &csv::ByteRecord, ints: &mut Vec<Option<i32>>, stmt: &postgres::stmt::Statement) -> u64 {
	for idx in 0..29 {
		if idx != 8 && idx != 10 && idx != 16  && idx != 17 && idx != 22 {
			let temp = str::from_utf8(&rec[idx]).unwrap();
			if temp == "NA" { 
				ints[idx] = None;
			} else { 
				ints[idx] = Some(temp.parse().unwrap());
			}
		} 
	}
	let f8:&str = str::from_utf8(&rec[8]).unwrap();
	let f10:&str = str::from_utf8(&rec[10]).unwrap();
	let f16:&str = str::from_utf8(&rec[16]).unwrap();
	let f17:&str = str::from_utf8(&rec[17]).unwrap();
	let f22:&str = str::from_utf8(&rec[22]).unwrap();
	let rn = stmt.execute(
			&[
			&ints[0], &ints[1], &ints[2], &ints[3], &ints[4], &ints[5], &ints[6], &ints[7], 
			&f8, &ints[9], &f10, &ints[11], &ints[12], &ints[13], &ints[14], &ints[15],
			&f16, &f17, &ints[18], &ints[19], &ints[20], &ints[21], &f22, &ints[23],  &ints[24], 
			&ints[25], &ints[26], &ints[27], &ints[28]
			]
		).expect("Insert failed");
	rn
}

#4

drain shouldn’t cause any moves of the Vec. I imagine it would look something like this:

if rowcnt % 100000 == 0 {
			let data1 = streaming_iterator::convert(data.drain()).map_ref(|v| &**v);
    		let mut binreader = BinaryCopyReader::new(table_types, data1);
    		stmt.copy_in(&[], &mut binreader).unwrap();

			println!("rowcount = {} in {}", rowcnt,  Utc::now().signed_duration_since( utc) );
			utc = Utc::now();
		} 

#5

It became data.drain(..) but otherwise that worked. Thank you. I think i even understand why that works now that i see it.


#6

Yes, sorry - I completely omitted the .. range arg in haste.

This is also intuitively what you want - you accumulate a bunch of ToSql values in the Vec, and then flush (or drain :slight_smile:). them to postgres. Since they’ve been sent to postgres, you also want them removed from the Vec, and that’s what drain does. Drain leaves the Vec itself alone, just moves data out of it, and you can continue using the Vec for subsequent rows.