How can values be moved to a function in a spawned thread and still be used in another thread?

I am trying to walk through the last web server project in the book in this part.

I did some organization and modification on top of what's mentioned in the book.

cargo.toml
[package]
name = "web_server"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
colored = "2.1.0"
main.rs
use web_server::start_web_server;

fn main() {
    start_web_server();
}
lib.rs
use colored::{ColoredString, Colorize};

use std::{
    fs,
    io::{BufRead, BufReader, Write},
    net::{TcpListener, TcpStream},
    process, thread,
    time::Duration,
};

fn info_text() -> ColoredString {
    "INFO:".bold().green()
}

fn error_text() -> ColoredString {
    "ERROR:".bold().red()
}

fn processing_text() -> ColoredString {
    "PROCESSING:".bold().blue()
}

fn request_text() -> ColoredString {
    "REQUEST:".bold().magenta()
}

fn response_text() -> ColoredString {
    "RESPONSE:".bold().cyan()
}

pub fn start_web_server() {
    println!("{} Starting web server", info_text());

    let listener = create_listener("127.0.0.1:7878");

    start_listening(listener);
}

pub fn create_listener(socket: &str) -> TcpListener {
    match TcpListener::bind(socket) {
        Ok(listener) => {
            println!("{} Socket created - {}", info_text(), socket);
            listener
        }

        Err(e) => {
            eprintln!(
                "{} Failed to create socket {} - {}",
                error_text(),
                socket,
                e
            );

            process::exit(1);
        }
    }
}

pub fn start_listening(listener: TcpListener) {
    for stream in listener.incoming() {
        let stream = match stream {
            Ok(stream) => {
                println!("{} Incoming stream received - {:?}", request_text(), stream);

                stream
            }

            Err(e) => {
                eprintln!("{} Failed to receive stream - {}", error_text(), e);
                process::exit(1);
            }
        };

        handle_connection(stream);
    }
}

pub fn handle_connection(stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);

    let http_request = extract_http_request(buf_reader);

    println!("{} HTTP Request - {:#?}", info_text(), http_request);

    thread::spawn(|| {
        send_response(stream, http_request);
    });
}

pub fn extract_http_request(buf_reader: BufReader<&TcpStream>) -> Vec<String> {
    buf_reader
        .lines()
        .map(|line| match line {
            Ok(line) => {
                println!(
                    "{} Line extracted from request - {:?}",
                    processing_text(),
                    line
                );

                line
            }

            Err(e) => {
                eprintln!("{} Failed to extract line - {}", error_text(), e);
                process::exit(1);
            }
        })
        .take_while(|line| !(line.is_empty()))
        .collect::<Vec<_>>()
}

pub fn send_response(mut stream: TcpStream, http_request: Vec<String>) {
    let response = create_response(http_request);

    match stream.write_all(response.as_bytes()) {
        Ok(_) => {
            println!("{} Response sent", response_text());
        }

        Err(e) => {
            eprintln!("{} Failed to send response - {}", error_text(), e);
            process::exit(1);
        }
    }
}

pub fn create_response(http_request: Vec<String>) -> String {
    let (status_line, file_to_send) = match http_request[0].as_str() {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),

        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "sleep.html")
        }

        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = match fs::read_to_string(file_to_send) {
        Ok(file_contents) => file_contents,

        Err(e) => {
            eprintln!("{} Failed to read file contents - {}", error_text(), e);

            process::exit(1);
        }
    };

    let length = contents.len();

    format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}")
}

As mentioned in the book, when i navigate to
http://127.0.0.1:7878/sleep
in my web browser, a sleep for 5 seconds should take place. When i visit
http://127.0.0.1:7878/
or
http://127.0.0.1:7878/other
during the sleep, the response have to wait until the sleep finishes.

I fixed this (before reading the book's solution) by changing

send_response(stream, http_request);

to

thread::spawn(|| {
        send_response(stream, http_request);
    });

Now my question is, How can the stream and http_request be moved to send_response() and can still be used in other threads ?

When i hit http://127.0.0.1:7878/ or other endpoint while the 5 sec sleep is taking place, the other response is sent fine and the server is not waiting for http://127.0.0.1:7878/sleep to finish . How can a new request such as http://127.0.0.1:7878/ occur when stream and http_request have been moved to send_response(stream, http_request) previously while hitting http://127.0.0.1:7878/sleep ?

i hope that made sense and i managed to deliver my point.

Once a value has been moved, it is moved. No way for the original thread you moved the value out of to continue using it. What you can do is to either clone the value and move the clone so both threads see the same value or share data between the two threads. The latter you can do either by message passing (i.e. using something like a mpsc queue) or by sharing memory (i.e. with Arc<Mutex<T>>).

Why do you need access to the stream and your headers beyond spawning a new thread that handles the connection?

2 Likes

i don't need access to stream and http_request, i just wanna know what happened here:

 thread::spawn(|| {
        send_response(stream, http_request);
    });

How can another thread (invoked by http://127.0.0.1:7878/) use stream and http_request although they have been moved in a previous thread (http://127.0.0.1:7878/sleep) ?!


While http://127.0.0.1:7878/sleep is waiting for its 5 seconds, i can still hit http://127.0.0.1:7878/ and get a response, this means that i am using send_response(stream, http_request); 2 times in 2 different threads !

Each time handle_connection() is called, it is given a different stream, and it creates a new http_request. Those particular values are moved only once and used by only one thread at a time.

1 Like

:person_facepalming:t2::sweat_smile: it was really stupid of me i didn't notice each connection had its own stream and request. Thanks.