"borrowed data escapes outside of method"?

impl Connection for TcpConnection {
	type Error = tokio::io::Error;


	fn new(config: ConnectionConfiguration) -> Self {
		return Self {
			listener: None,
			task_aborters: Vec::new(),
			config: config,
		};
	}

	async fn bind(&mut self) -> Result<(), Self::Error> {
		// Bind a socket and set the field with it:
		self.listener = Some(TcpListener::bind((self.config.addr, self.config.port)).await?);
		return Ok(());
	}

	async fn accept(&mut self) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		
		// Check if we're supposed to be listening:
		if let Some(listener) = &self.listener {

			// If we are, accept one connection:
			let (stream, address) = listener.accept().await?;
			eprintln!("New connection from {}.", &address);

			// And split the stream into a sender and a receiver:
			let (mut rx, mut tx) = stream.into_split();

			// Now we need to make two key exchange objects:
			let mut i_kex = self.config.kex.generate();
			let mut o_kex = self.config.kex.generate();

			// We'll send `i_kex`'s public key first, then `o_kex`'s:
			tx.write_all(&i_kex.get_local_pubkey()).await?;
			tx.write_all(&o_kex.get_local_pubkey()).await?;

			// Then, we read out the remote public keys:
			let mut i_pubkey_buf: Vec<u8> = vec![0_u8; i_kex.get_public_key_length()];
			let mut o_pubkey_buf: Vec<u8> = vec![0_u8; o_kex.get_public_key_length()];
			rx.read_exact(&mut o_pubkey_buf).await?;	// The other's output should be my input,
			rx.read_exact(&mut i_pubkey_buf).await?;	// and vis-versa.

			// And set the remote public key on the key exchangers:
			if let Err(e) = i_kex.set_remote_pubkey(&i_pubkey_buf) {
				return Err(Error::other(e));
			}
			if let Err(e) = o_kex.set_remote_pubkey(&o_pubkey_buf) {
				return Err(Error::other(e));
			}

			// Now we need to actually initiate the key exchange, starting with the client init step:
			tx.write_all(&o_kex.client_init().map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a client init.
			let mut i_remote_client_init_buf: Vec<u8> = vec![0_u8; i_kex.get_client_init_length()];	// For holding the client's client init.
			rx.read_exact(&mut i_remote_client_init_buf).await?;	// Receive it.

			// Do the server init step:
			tx.write_all(&i_kex.server_init(&i_remote_client_init_buf).map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a server init.
			let mut o_remote_server_init_buf: Vec<u8> = vec![0_u8; o_kex.get_server_init_length()];	// For holding the client's server init.
			rx.read_exact(&mut o_remote_server_init_buf).await?;

			// Do the client confirm step:
			o_kex.client_confirm(&o_remote_server_init_buf).map_err(|e| { Error::other(e.to_string()) })?;	// Done with key exchange!

			// Make the keys/crypto thingies:
			let (encryptor, decryptor) = self.config.crypto.generate(i_kex, o_kex);

			// Make the channels:
			let (send_sender, mut send_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);
			let (mut recv_sender, recv_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);

			// Spawn the tasks, and add their abort handles to the internal vector:
			self.task_aborters.push(task::spawn(async move {
				// Read byte vectors out of the channel, until the other end is dropped:
				while let Some(mut byte_vec) = send_receiver.recv().await {
					// Attempt to encrypt the data:
					if let Err(e) = encryptor.encrypt(&mut byte_vec, b"") {
						// If unsuccessful:
						eprintln!("error in sender task on connection to {}: {:?}", &address, e);
						break;
					} else {
						// If successful, try to send it:
						let mut message: Vec<u8> = byte_vec.len().to_le_bytes().to_vec();	// First, get the length.
						message.append(&mut byte_vec);	// Now append the rest of the message.
						if let Err(e) = tx.write_all(&byte_vec).await {
							// If sending fails:
							eprintln!("error in sender task on connection to {}: {}", &address, e);
							break;
						}
					}
				}
			}).abort_handle());	// Send task.
			self.task_aborters.push(task::spawn(async move {}).abort_handle());	// Receive task.

			// Return the channels:
			return Ok((send_sender, recv_receiver));
		} else {
			// If this `struct` _shouldn't_ be listening:
			return Err(Error::other("this `TcpConnection` should not be listening!"));
		}
	}

	async fn connect(&mut self, addr: Ipv6Addr, port: u16) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		todo!()
	}

}

I'm getting this error message from the first call to task::spawn(), and from what I've seen online, it's not that uncommon an error, but every time it pops up on these forums it's quite different, so finding a solution has been difficult for me. Note that I will be cleaning up this messy code soon. Here is the full error message:

error[E0521]: borrowed data escapes outside of method
   --> src/qshd/connection/qsh_tcp.rs:105:28
    |
50  |       async fn accept(&mut self) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
    |                       ---------
    |                       |
    |                       `self` is a reference that is only valid in the method body
    |                       let's call the lifetime of this reference `'1`
...
105 |               self.task_aborters.push(task::spawn(async move {
    |  _____________________________________^
106 | |                 // Read byte vectors out of the channel, until the other end is dropped:
107 | |                 while let Some(mut byte_vec) = send_receiver.recv().await {
...   |
124 | |             }).abort_handle());    // Send task.
    | |              ^
    | |              |
    | |______________`self` escapes the method body here
    |                argument requires that `'1` must outlive `'static`

The error message means that one of the variables captured by your spawned async move { block is a borrow of self. Unfortunately, it doesn’t tell you which one, so we have to think about all of them.

The only captured variable that seems like a candidate is encryptor. What is its type, exactly? What is the signature of the function self.config.crypto.generate.

pub fn generate<T: KeyExchanger>(&self, i: T, o: T) -> (impl Encryptor, impl Decryptor) {
	return match self {
		Self::AesGcm => (AesGcmEncryptor::new(o), AesGcmDecryptor::new(i)),
	};
}

When you write -> impl Encryptor, the default is to act as if the return value captures everything in the function’s inputs. (This is so that you can, as a non-breaking change, make the actual implementing type use or stop using any of those inputs.) You should specify an explicit capture list that doesn't include the self lifetime, instead:

pub fn generate<T: KeyExchanger>(&self, i: T, o: T) -> (impl Encryptor + use<T>, impl Decryptor + use<T>) {

Reference documentation.

2 Likes

Got it, thanks!
I'll give it a shot, and see how it goes...

Works great, thanks! I didn't know about this at all until now...

It worked great, for a bit. Then I made a bunch of changes, and now it's back:

impl Connection for TcpConnection {
	type Error = tokio::io::Error;


	fn new(config: ConnectionConfiguration) -> Self {
		return Self {
			listener: None,
			task_aborters: Vec::new(),
			config: config,
		};
	}

	async fn bind(&mut self) -> Result<(), Self::Error> {
		// Bind a socket and set the field with it:
		self.listener = Some(TcpListener::bind((self.config.addr, self.config.port)).await?);
		return Ok(());
	}

	async fn accept(&mut self) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		
		// Check if we're supposed to be listening:
		if let Some(listener) = &self.listener {

			// If we are, accept one connection:
			let (stream, address) = listener.accept().await?;
			eprintln!("New connection from {}.", &address);

			// And split the stream into a sender and a receiver:
			let (rx_u, tx_u) = stream.into_split();
			let mut tx = BufWriter::new(tx_u);
			let mut rx = BufReader::new(rx_u);

			// Now we need to make two key exchange objects:
			let mut i_kex = self.config.kex.generate();
			let mut o_kex = self.config.kex.generate();

			// We'll send `i_kex`'s public key first, then `o_kex`'s:
			tx.write(&i_kex.get_local_pubkey()).await?;
			tx.write(&o_kex.get_local_pubkey()).await?;
			tx.flush().await?;

			// Then, we read out the remote public keys:
			let mut i_pubkey_buf: Vec<u8> = vec![0_u8; i_kex.get_public_key_length()];
			let mut o_pubkey_buf: Vec<u8> = vec![0_u8; o_kex.get_public_key_length()];
			rx.read_exact(&mut o_pubkey_buf).await?;	// The other's output should be my input,
			rx.read_exact(&mut i_pubkey_buf).await?;	// and vis-versa.

			// And set the remote public key on the key exchangers:
			if let Err(e) = i_kex.set_remote_pubkey(&i_pubkey_buf) {
				return Err(Error::other(e));
			}
			if let Err(e) = o_kex.set_remote_pubkey(&o_pubkey_buf) {
				return Err(Error::other(e));
			}

			// Now we need to actually initiate the key exchange, starting with the client init step:
			tx.write(&o_kex.client_init().map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a client init.
			tx.flush().await?;
			let mut i_remote_client_init_buf: Vec<u8> = vec![0_u8; i_kex.get_client_init_length()];	// For holding the client's client init.
			rx.read_exact(&mut i_remote_client_init_buf).await?;	// Receive it.

			// Do the server init step:
			tx.write(&i_kex.server_init(&i_remote_client_init_buf).map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a server init.
			tx.flush().await?;
			let mut o_remote_server_init_buf: Vec<u8> = vec![0_u8; o_kex.get_server_init_length()];	// For holding the client's server init.
			rx.read_exact(&mut o_remote_server_init_buf).await?;

			// Do the client confirm step:
			o_kex.client_confirm(&o_remote_server_init_buf).map_err(|e| { Error::other(e.to_string()) })?;	// Done with key exchange!

			// Make the keys/crypto thingies:
			let (encryptor, decryptor) = self.config.crypto.generate(i_kex, o_kex);

			// Make the channels:
			let (send_sender, mut send_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);
			let (mut recv_sender, recv_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);

			// Spawn the tasks, and add their abort handles to the internal vector:
			self.task_aborters.push(task::spawn(async move {
				// Read byte vectors out of the channel, until the other end is dropped:
				while let Some(mut byte_vec) = send_receiver.recv().await {
					// Attempt to encrypt the data:
					if let Err(e) = encryptor.encrypt(&mut byte_vec, b"") {
						// If unsuccessful:
						eprintln!("error in sender task on connection to {}: {:?}", &address, e);
						break;
					} else {
						// If successful, try to send the length, followed by the data:
						if let Err(e) = tx.write_u64_le(byte_vec.len().try_into().expect("`u64` _should_ be the same as `usize`, but it isn't...")).await {
							eprintln!("error in sender task on connection to {}: {}", &address, e);
							break;
						}
						if let Err(e) = tx.write(&byte_vec).await {
							// If sending fails:
							eprintln!("error in sender task on connection to {}: {}", &address, e);
							break;
						}
					}
					if let Err(e) = tx.flush().await {
						eprintln!("error in sender task on connection to {}: {}", &address, e);
						break;
					}
				}
			}).abort_handle());	// Send task.
			self.task_aborters.push(task::spawn(async move {}).abort_handle());	// Receive task.

			// Return the channels:
			return Ok((send_sender, recv_receiver));
		} else {
			// If this `struct` _shouldn't_ be listening:
			return Err(Error::other("this `TcpConnection` should not be listening!"));
		}
	}

	async fn connect(&mut self, addr: Ipv6Addr, port: u16) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		todo!()
	}

}

I can't figure out what might be capturing self... any ideas?

The primary thing I changed was that I added buffering, but that shouldn't cause issues, right?

Posting a diff showing the changes you made would be helpful.

Might as well post the new error as well, in case there is a small difference.

I can't get you a diff, as I didn't commit the changes seperately. Here's the error though:

error[E0521]: borrowed data escapes outside of method
   --> src/qshd/connection/qsh_tcp.rs:110:28
    |
50  |       async fn accept(&mut self) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
    |                       ---------
    |                       |
    |                       `self` is a reference that is only valid in the method body
    |                       let's call the lifetime of this reference `'1`
...
110 |               self.task_aborters.push(task::spawn(async move {
    |  _____________________________________^
111 | |                 // Read byte vectors out of the channel, until the other end is dropped:
112 | |                 while let Some(mut byte_vec) = send_receiver.recv().await {
...   |
135 | |             }).abort_handle());    // Send task.
    | |              ^
    | |              |
    | |______________`self` escapes the method body here
    |                argument requires that `'1` must outlive `'static`

Ok. And you're absolutely sure the change that @kpreid gave you is still applied?

Here's a diff of the two code blocks you posted.

30c30,32
< 			let (mut rx, mut tx) = stream.into_split();
---
> 			let (rx_u, tx_u) = stream.into_split();
> 			let mut tx = BufWriter::new(tx_u);
> 			let mut rx = BufReader::new(rx_u);
37,38c39,41
< 			tx.write_all(&i_kex.get_local_pubkey()).await?;
< 			tx.write_all(&o_kex.get_local_pubkey()).await?;
---
> 			tx.write(&i_kex.get_local_pubkey()).await?;
> 			tx.write(&o_kex.get_local_pubkey()).await?;
> 			tx.flush().await?;
55c58,59
< 			tx.write_all(&o_kex.client_init().map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a client init.
---
> 			tx.write(&o_kex.client_init().map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a client init.
> 			tx.flush().await?;
60c64,65
< 			tx.write_all(&i_kex.server_init(&i_remote_client_init_buf).map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a server init.
---
> 			tx.write(&i_kex.server_init(&i_remote_client_init_buf).map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a server init.
> 			tx.flush().await?;
84,87c89,94
< 						// If successful, try to send it:
< 						let mut message: Vec<u8> = byte_vec.len().to_le_bytes().to_vec();	// First, get the length.
< 						message.append(&mut byte_vec);	// Now append the rest of the message.
< 						if let Err(e) = tx.write_all(&byte_vec).await {
---
> 						// If successful, try to send the length, followed by the data:
> 						if let Err(e) = tx.write_u64_le(byte_vec.len().try_into().expect("`u64` _should_ be the same as `usize`, but it isn't...")).await {
> 							eprintln!("error in sender task on connection to {}: {}", &address, e);
> 							break;
> 						}
> 						if let Err(e) = tx.write(&byte_vec).await {
91a99,102
> 					}
> 					if let Err(e) = tx.flush().await {
> 						eprintln!("error in sender task on connection to {}: {}", &address, e);
>

Yeah, here:

#[derive(Deserialize)]
pub enum Implementation {
	AesGcm,
} impl Implementation {
	pub fn generate<T: KeyExchanger>(&self, i: T, o: T) -> (impl Encryptor + use<T>, impl Decryptor + use<T>) {
		return match self {
			Self::AesGcm => (AesGcmEncryptor::new(o), AesGcmDecryptor::new(i)),
		};
	}
}

(The use<T> is what's relevant.)

Is it maybe:

// Now we need to make two key exchange objects:
let mut i_kex = self.config.kex.generate();
let mut o_kex = self.config.kex.generate();

The signature of this generate() is as follows:

pub fn generate(&self) -> impl KeyExchanger {
	return match self {
		Self::Kyberlib => KyberlibKeyExchanger::new().expect("failed to generate new keypair"),
	};
}

There are no generic parameters though.

Maybe it has something to do with the TcpListener, or the TcpStream or SocketAddr from .accept()? Also, what about the first line of TcpConnection::accept()?

Yes, there are; there is one elided lifetime parameter. For that case you would write impl KeyExchanger + use<> to exclude it.

But that should not make any difference because i_kex and o_kex are not captured by the async block.

Actually, now that I think about it, maybe you need more guarantees on the first one, adding T: 'static:

pub fn generate<T: KeyExchanger + 'static>(&self, i: T, o: T) -> (impl Encryptor + use<T>, impl Decryptor + use<T>) {

That idea doesn't match the error message, though.

1 Like

Huh... believe it or not, adding use<> to generate() like so:

pub fn generate(&self) -> impl KeyExchanger + use<> {
	return match self {
		Self::Kyberlib => KyberlibKeyExchanger::new().expect("failed to generate new keypair"),
	};
}

actually did fix the issue...

@LeyviRose, denying elided_lifetimes_in_paths forces you to declare lifetimes in paths where they can be elided, like in the two cases that you've encountered, making it much easier to see where lifetimes are used and solve problems like these. Unfortunately this isn't the default.

In Cargo.toml:

[lints.rust]
elided_lifetimes_in_paths = "deny"

Or at the top of lib.rs or main.rs:

#![deny(elided_lifetimes_in_paths)]

While I agree that elided_lifetimes_in_paths is useful, it does not apply to this situation; captured lifetimes are not part of a path. and so the lint doesn’t detect it. Demonstration:

#![deny(elided_lifetimes_in_paths)]

// This captures s's lifetime but doesn't need to
fn not_borrowed(s: &str) -> impl std::fmt::Display {
    s.to_owned()
}

fn example() {
    let string = String::from("hello"); 
    let mysterious = not_borrowed(&string);
    std::thread::spawn(|| {
        println!("{mysterious}");
    });
}

This program fails to compile but (correctly) doesn’t report the lint.

1 Like

Thanks for the correction. I guess I'll just need to be very careful with impl Trait return values, and perhaps get in the habit of always adding a use.

1 Like