Rest of my code would not run after calling tokio::spawn

Please can someone help explain why the rest of my code is not running

// start the gRPC server
#[tracing::instrument(name = "gprc_server", skip(tx))]
async fn gRPC_server(tx: Sender<UpgradeTask>) {
    let grpc_address = env::var("GRPC_SERVER_ADDR").unwrap().parse().unwrap();

    // creating a service
    let manager_service = Task { queue_addr: tx };

    tracing::info!("Server listening {:?}", grpc_address);

    Server::builder()
        .add_service(FirmwareTaskServer::new(manager_service))
        .serve(grpc_address)
        .await
        .unwrap();
}

GPRC Service

#[tonic::async_trait]
impl FirmwareTask for Task {

    #[tracing::instrument(skip(self, request))]
    async fn submit(&self, request: Request<UpgradeTask>) -> Result<Response<TaskInfo>, Status> {
        let task = request.into_inner();

        info!(nav_code = task.navision_code.as_str());
        info!(nav_code = task.navision_code.as_str());

        // crate response
        let response = TaskInfo {
            id: 0,
            name: "Test Test".to_string(),
            status: 2,
        };
    
        let output = self.queue_addr.send(task).await;

        // will delete, quick way to see it tx was failing to send
        if output.is_err() {
            info!("Failed to send");
        }


        Ok(Response::new(response))
    }
}

Now the main function

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    dotenv::dotenv().ok();

    let subscriber = tracing_subscriber::FmtSubscriber::builder()
        // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.)
        // will be written to stdout.
        .with_max_level(Level::DEBUG)
        .with_level(true)
        // builds the subscriber.
        .finish();

    // set up our global subscriber
    tracing::subscriber::set_global_default(subscriber);

    // create a task Queue
    let mut queue = Queue::<UpgradeTask>::new(10);

    // lets make it and
    let sharedQueue = Arc::new(Mutex::new(queue));

    let (tx, mut rx) = tokio::sync::mpsc::channel(100);

    // spawn a task listening green thread
    let mut _grpc_handle = tokio::spawn(async move { gRPC_server(tx).await }).await;
    tracing::info!("After above line is call, it never gets here");   // <==  IT NEVER GETS HERE

    // duplicate value
    let h = sharedQueue.clone();

    // spawn another green thread to continuously read data
    let mut f_handle = tokio::spawn(async move {
        let mut sharedQueue = h;

        while true {
            {
                // get a lock to the mutex
                let mut shareddataLock = sharedQueue.lock().unwrap();

                // We shouldn't hold the lock accross uploading the firmware
                if let Some(task) = shareddataLock.dequeue() {
                    let instance = firmware_manager::FirmwareManager { service: task };
                    instance.upload();
                }
            }

            // we want to sleep so as to reduce resources
            // TODO: better waiting implementation
            tokio::time::sleep(Duration::new(30, 0)).await;
        }
    });

    let cloneSharedQueue = sharedQueue.clone();
    while let Some(task) = rx.recv().await {
        // get a lock on the data
        let mut data_guard = cloneSharedQueue.lock().unwrap(); // TODO: better error handling
        data_guard.enqueue(task);
    }

    let handles = vec![f_handle];
    for handle in handles {
        handle.await.expect("Panic in task");
    }

    Ok(())
}

output

2021-11-20T22:47:36.227639Z  INFO gprc_server: yeti: Server listening 127.0.0.1:8090
2021-11-20T22:48:04.089865Z DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384 }
2021-11-20T22:48:04.090792Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 0, initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 8192 }
... Just the same Debug stuff.. skipped
2021-11-20T22:48:04.093525Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 983041 }
2021-11-20T22:48:04.094590Z  INFO submit: yeti::task: nav_code="Hello"
2021-11-20T22:48:04.095817Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
2021-11-20T22:48:04.096216Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(1) }
2021-11-20T22:48:04.096477Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }

You immediately .await the spawned task. This has the effect of waiting for the task to complete.

-   let mut _grpc_handle = tokio::spawn(async move { gRPC_server(tx).await }).await;
+   let mut _grpc_handle = tokio::spawn(async move { gRPC_server(tx).await });
3 Likes

Thanks.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.