Please can someone help explain why the rest of my code is not running
// start the gRPC server
#[tracing::instrument(name = "gprc_server", skip(tx))]
async fn gRPC_server(tx: Sender<UpgradeTask>) {
let grpc_address = env::var("GRPC_SERVER_ADDR").unwrap().parse().unwrap();
// creating a service
let manager_service = Task { queue_addr: tx };
tracing::info!("Server listening {:?}", grpc_address);
Server::builder()
.add_service(FirmwareTaskServer::new(manager_service))
.serve(grpc_address)
.await
.unwrap();
}
GPRC Service
#[tonic::async_trait]
impl FirmwareTask for Task {
#[tracing::instrument(skip(self, request))]
async fn submit(&self, request: Request<UpgradeTask>) -> Result<Response<TaskInfo>, Status> {
let task = request.into_inner();
info!(nav_code = task.navision_code.as_str());
info!(nav_code = task.navision_code.as_str());
// crate response
let response = TaskInfo {
id: 0,
name: "Test Test".to_string(),
status: 2,
};
let output = self.queue_addr.send(task).await;
// will delete, quick way to see it tx was failing to send
if output.is_err() {
info!("Failed to send");
}
Ok(Response::new(response))
}
}
Now the main function
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let subscriber = tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.)
// will be written to stdout.
.with_max_level(Level::DEBUG)
.with_level(true)
// builds the subscriber.
.finish();
// set up our global subscriber
tracing::subscriber::set_global_default(subscriber);
// create a task Queue
let mut queue = Queue::<UpgradeTask>::new(10);
// lets make it and
let sharedQueue = Arc::new(Mutex::new(queue));
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
// spawn a task listening green thread
let mut _grpc_handle = tokio::spawn(async move { gRPC_server(tx).await }).await;
tracing::info!("After above line is call, it never gets here"); // <== IT NEVER GETS HERE
// duplicate value
let h = sharedQueue.clone();
// spawn another green thread to continuously read data
let mut f_handle = tokio::spawn(async move {
let mut sharedQueue = h;
while true {
{
// get a lock to the mutex
let mut shareddataLock = sharedQueue.lock().unwrap();
// We shouldn't hold the lock accross uploading the firmware
if let Some(task) = shareddataLock.dequeue() {
let instance = firmware_manager::FirmwareManager { service: task };
instance.upload();
}
}
// we want to sleep so as to reduce resources
// TODO: better waiting implementation
tokio::time::sleep(Duration::new(30, 0)).await;
}
});
let cloneSharedQueue = sharedQueue.clone();
while let Some(task) = rx.recv().await {
// get a lock on the data
let mut data_guard = cloneSharedQueue.lock().unwrap(); // TODO: better error handling
data_guard.enqueue(task);
}
let handles = vec![f_handle];
for handle in handles {
handle.await.expect("Panic in task");
}
Ok(())
}
output
2021-11-20T22:47:36.227639Z INFO gprc_server: yeti: Server listening 127.0.0.1:8090
2021-11-20T22:48:04.089865Z DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384 }
2021-11-20T22:48:04.090792Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 0, initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 8192 }
... Just the same Debug stuff.. skipped
2021-11-20T22:48:04.093525Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 983041 }
2021-11-20T22:48:04.094590Z INFO submit: yeti::task: nav_code="Hello"
2021-11-20T22:48:04.095817Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
2021-11-20T22:48:04.096216Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(1) }
2021-11-20T22:48:04.096477Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }