diff --git a/Cargo.lock b/Cargo.lock index b8dbaec8..0c21409d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "agent_store", "agent_verification", "axum 0.7.5", + "axum-macros", "tokio", "tracing", "tracing-subscriber", @@ -324,6 +325,7 @@ dependencies = [ "serde_json", "sqlx", "tokio", + "tracing", ] [[package]] diff --git a/agent_api_rest/postman/ssi-agent.postman_collection.json b/agent_api_rest/postman/ssi-agent.postman_collection.json index 948a0da9..edf6b0f2 100644 --- a/agent_api_rest/postman/ssi-agent.postman_collection.json +++ b/agent_api_rest/postman/ssi-agent.postman_collection.json @@ -1,9 +1,9 @@ { "info": { - "_postman_id": "3b1e6396-3bf7-43aa-bc3b-21056fb21dfa", + "_postman_id": "d910f16f-e653-4b7d-949e-255b2a7d4834", "name": "ssi-agent", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", - "_exporter_id": "24972330" + "_exporter_id": "30650915" }, "item": [ { @@ -97,7 +97,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"first_name\": \"Ferris\",\n \"last_name\": \"Crabman\",\n \"dob\": \"1982-01-01\"\n }\n }\n}", + "raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"first_name\": \"Ferris\",\n \"last_name\": \"Crabman\",\n \"dob\": \"1982-01-01\"\n }\n },\n \"expires\": \"P2D\"\n}", "options": { "raw": { "language": "json" @@ -151,7 +151,7 @@ "header": [], "body": { "mode": "raw", - "raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"id\": \"https://ecommerce.impierce.com/\",\n \"image\": \"https://static.wikia.nocookie.net/fictionalcompanies/images/c/c2/ACME_Corporation.png\",\n \"name\": \"VirtualVendors\",\n \"certificaat\": {\n \"type\": \"ACMECorpCredential\",\n \"certificeringsDatum\": \"2024-06-26\",\n \"geldigheidsPeriode\": \"1 jaar\",\n \"garanties\": [\n \"Het bedrijf is echt en bereikbaar.\",\n \"Voldoet aan de Thuiswinkel Algemene Voorwaarden.\",\n \"14 dagen bedenktijd.\",\n \"Veilige betaalmethoden.\",\n \"Duidelijke product/servicebeschrijvingen.\",\n \"Transparant bestelproces.\",\n \"Duidelijke prijzen.\",\n \"Veilige betaalomgeving.\",\n \"Veilige omgang met persoonlijke gegevens.\",\n \"Effectieve klachtenafhandeling en onafhankelijke geschillenbemiddeling.\"\n ]\n }\n }\n }\n}", + "raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"id\": \"https://acme.example.org/1a2b3c4d5e6f\",\n \"credentialSubject\": {\n \"id\": \"https://ecommerce.impierce.com/\",\n \"image\": \"https://static.wikia.nocookie.net/fictionalcompanies/images/c/c2/ACME_Corporation.png\",\n \"name\": \"VirtualVendors\",\n \"certificaat\": {\n \"type\": \"ACMECorpCredential\",\n \"certificeringsDatum\": \"2024-06-26\",\n \"geldigheidsPeriode\": \"1 jaar\",\n \"garanties\": [\n \"Het bedrijf is echt en bereikbaar.\",\n \"Voldoet aan de Thuiswinkel Algemene Voorwaarden.\",\n \"14 dagen bedenktijd.\",\n \"Veilige betaalmethoden.\",\n \"Duidelijke product/servicebeschrijvingen.\",\n \"Transparant bestelproces.\",\n \"Duidelijke prijzen.\",\n \"Veilige betaalomgeving.\",\n \"Veilige omgang met persoonlijke gegevens.\",\n \"Effectieve klachtenafhandeling en onafhankelijke geschillenbemiddeling.\"\n ]\n }\n }\n }\n}", "options": { "raw": { "language": "json" @@ -629,11 +629,26 @@ } ], "request": { + "auth": { + "type": "apikey", + "apikey": [ + { + "key": "value", + "value": "E8C5AEF8F5954FCA9F543D4913569C98", + "type": "string" + }, + { + "key": "key", + "value": "X-API-KEY", + "type": "string" + } + ] + }, "method": "POST", "header": [], "body": { "mode": "raw", - "raw": "{\n \"nonce\": \"this is a nonce\",\n \"presentation_definition\": {\n \"id\":\"Verifiable Presentation request for sign-on\",\n \"input_descriptors\":[\n {\n \"id\":\"Request for Verifiable Credential\",\n \"constraints\":{\n \"fields\":[\n {\n \"path\":[\n \"$.vc.type\"\n ],\n \"filter\":{\n \"type\":\"array\",\n \"contains\":{\n \"const\":\"VerifiableCredential\"\n }\n }\n }\n ]\n }\n }\n ]\n }\n}", + "raw": "{\n \"nonce\": \"this is a nonce\",\n \"presentation_definition\": {\n \"id\":\"Verifiable Presentation request for sign-on\",\n \"input_descriptors\":[\n {\n \"id\":\"Request for Verifiable Credential\",\n \"constraints\":{\n \"fields\":[\n {\n \"path\":[\n \"$.vc.type\"\n ],\n \"filter\":{\n \"type\":\"array\",\n \"contains\":{\n \"const\":\"IdentificationCardCredential\"\n }\n }\n }\n ]\n }\n }\n ]\n }\n}", "options": { "raw": { "language": "json" @@ -641,9 +656,13 @@ } }, "url": { - "raw": "{{HOST}}/v0/authorization_requests", + "raw": "https://zkteco.dev.impierce.com/v0/authorization_requests", + "protocol": "https", "host": [ - "{{HOST}}" + "zkteco", + "dev", + "impierce", + "com" ], "path": [ "v0", @@ -1521,8 +1540,45 @@ "response": [] } ] + }, + { + "name": "_monitoring", + "item": [ + { + "name": "Liveness probe", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{HOST}}/healthz", + "host": [ + "{{HOST}}" + ], + "path": [ + "healthz" + ] + } + }, + "response": [] + } + ] } ], + "auth": { + "type": "apikey", + "apikey": [ + { + "key": "value", + "value": "{{API_KEY}}", + "type": "string" + }, + { + "key": "key", + "value": "X-API-KEY", + "type": "string" + } + ] + }, "event": [ { "listen": "prerequest", @@ -1603,6 +1659,10 @@ "key": "CONNECTION_ID", "value": "INITIAL_VALUE", "type": "string" + }, + { + "key": "SERVICE_ID", + "value": "" } ] } \ No newline at end of file diff --git a/agent_application/Cargo.toml b/agent_application/Cargo.toml index 078f67cd..1dbfd486 100644 --- a/agent_application/Cargo.toml +++ b/agent_application/Cargo.toml @@ -16,6 +16,7 @@ agent_store = { path = "../agent_store" } agent_verification = { path = "../agent_verification" } axum.workspace = true +axum-macros = "0.4" tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/agent_application/src/main.rs b/agent_application/src/main.rs index 61932d1d..8d5388eb 100644 --- a/agent_application/src/main.rs +++ b/agent_application/src/main.rs @@ -1,5 +1,7 @@ #![allow(clippy::await_holding_lock)] +mod probes; + use agent_api_rest::{app, ApplicationState}; use agent_event_publisher_http::EventPublisherHttp; use agent_holder::services::HolderServices; @@ -9,8 +11,9 @@ use agent_secret_manager::{secret_manager, service::Service as _, subject::Subje use agent_shared::config::{config, LogFormat}; use agent_store::{in_memory, postgres, EventPublisher}; use agent_verification::services::VerificationServices; +use probes::{liveness::healthz, readiness::readyz_handler}; use std::sync::Arc; -use tokio::{fs, io}; +use tokio::io; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -68,6 +71,10 @@ async fn main() -> io::Result<()> { agent_identity::state::initialize(&identity_state).await; agent_issuance::state::initialize(&issuance_state, startup_commands(url.clone())).await; + let health_router = axum::Router::new() + .route("/healthz", axum::routing::get(healthz)) + .route("/readyz", axum::routing::get(readyz_handler)); + let app = app(ApplicationState { identity_state: Some(identity_state), issuance_state: Some(issuance_state), @@ -75,11 +82,7 @@ async fn main() -> io::Result<()> { verification_state: Some(verification_state), }); - // This is used to indicate that the server accepts requests. - // In a docker container this file can be searched to see if its ready. - // A better solution can be made later (needed for impierce-demo) - fs::create_dir_all("/tmp/unicore/").await?; - fs::write("/tmp/unicore/accept_requests", []).await?; + let app = health_router.merge(app); let listener = tokio::net::TcpListener::bind("0.0.0.0:3033").await?; info!("listening on {}", listener.local_addr()?); diff --git a/agent_application/src/probes/liveness.rs b/agent_application/src/probes/liveness.rs new file mode 100644 index 00000000..8644bbb4 --- /dev/null +++ b/agent_application/src/probes/liveness.rs @@ -0,0 +1,7 @@ +use axum::http::StatusCode; +use axum::response::IntoResponse; + +/// A simple liveness probe following application monitoring conventions. +pub async fn healthz() -> impl IntoResponse { + StatusCode::OK +} diff --git a/agent_application/src/probes/mod.rs b/agent_application/src/probes/mod.rs new file mode 100644 index 00000000..89c08bc0 --- /dev/null +++ b/agent_application/src/probes/mod.rs @@ -0,0 +1,2 @@ +pub mod liveness; +pub mod readiness; diff --git a/agent_application/src/probes/readiness.rs b/agent_application/src/probes/readiness.rs new file mode 100644 index 00000000..c9c2608c --- /dev/null +++ b/agent_application/src/probes/readiness.rs @@ -0,0 +1,46 @@ +use agent_shared::config::config; +use agent_shared::config::EventStoreType; +use agent_store::postgres::check_connection; +use axum::http::StatusCode; +use axum::response::IntoResponse; + +#[axum_macros::debug_handler] +pub async fn readyz_handler() -> impl IntoResponse { + // check database connection + // check message queue connection + + let event_store_type = config().event_store.type_.clone(); + + // write code: if config is postgres, then call the postgres check_connection function + let status_code = match event_store_type { + EventStoreType::InMemory => { + println!("Checking Postgres connection ..."); + + // check_connection().await; + + if check_connection().await { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + } + // StatusCode::OK + + // if postgres::check_connection().await { + // return axum::http::StatusCode::OK; + // } else { + // return axum::http::StatusCode::SERVICE_UNAVAILABLE; + // } + // if let Err(e) = postgres::check_connection().await { + // return axum::http::StatusCode::SERVICE_UNAVAILABLE; + // } + } + EventStoreType::Postgres => { + // do nothing + StatusCode::OK + } + }; + + // Response::new("foobar".into()) + + status_code +} diff --git a/agent_store/Cargo.toml b/agent_store/Cargo.toml index b48261ae..97f44c08 100644 --- a/agent_store/Cargo.toml +++ b/agent_store/Cargo.toml @@ -21,3 +21,4 @@ sqlx = { version = "0.7", features = [ "json", ] } tokio.workspace = true +tracing.workspace = true diff --git a/agent_store/src/postgres.rs b/agent_store/src/postgres.rs index 0dbfe145..66b15ded 100644 --- a/agent_store/src/postgres.rs +++ b/agent_store/src/postgres.rs @@ -1,6 +1,6 @@ use crate::{partition_event_publishers, EventPublisher, Partitions}; use agent_holder::{services::HolderServices, state::HolderState}; -use agent_identity::{services::IdentityServices, state::IdentityState}; +use agent_identity::{connection, services::IdentityServices, state::IdentityState}; use agent_issuance::{ offer::queries::{access_token::AccessTokenQuery, pre_authorized_code::PreAuthorizedCodeQuery}, services::IssuanceServices, @@ -13,8 +13,8 @@ use agent_shared::{ use agent_verification::{services::VerificationServices, state::VerificationState}; use async_trait::async_trait; use cqrs_es::{Aggregate, Query}; -use postgres_es::{default_postgress_pool, PostgresCqrs, PostgresViewRepository}; -use sqlx::{Pool, Postgres}; +use postgres_es::{PostgresCqrs, PostgresViewRepository}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use std::{collections::HashMap, sync::Arc}; struct AggregateHandler @@ -323,3 +323,36 @@ pub async fn verification_state( }, } } + +/// Replacement for `postgres_es::default_postgress_pool`, but returns an error instead of panicking. +pub async fn default_postgress_pool(connection_string: &str) -> Result, sqlx::Error> { + PgPoolOptions::new() + .max_connections(10) + .connect(connection_string) + .await + // .expect("unable to connect to database") +} + +pub async fn check_connection() -> bool { + let connection_string = config().event_store.connection_string.clone().expect( + "Missing config parameter `event_store.connection_string` or `UNICORE__EVENT_STORE__CONNECTION_STRING`", + ); + let connection_string = "foobar".to_string(); + let pool = if let Ok(pool) = default_postgress_pool(&connection_string) + .await + .inspect_err(|e| tracing::debug!("Database connectivity check failed: {}", e)) + { + pool + } else { + return false; + }; + + // sqlx::query("SELECT 1").fetch_one(&pool).await.is_ok() + match sqlx::query("SELECT 1").execute(&pool).await { + Ok(_) => true, + Err(err) => { + tracing::debug!("Database connectivity check failed: {}", err); + false + } + } +}