From 3c0b082a742e452cf5128d9dc8d9a63877cb8812 Mon Sep 17 00:00:00 2001 From: Artur Jurat Date: Wed, 24 Apr 2024 19:58:21 +0200 Subject: [PATCH] chore: migrate webhook_out to diesel (#190) --- .../crates/diesel-models/src/extend/mod.rs | 1 + .../crates/diesel-models/src/extend/order.rs | 6 + .../crates/diesel-models/src/query/mod.rs | 1 + .../diesel-models/src/query/webhooks.rs | 122 ++++++++++++++++++ .../crates/diesel-models/src/schema.rs | 2 +- .../crates/diesel-models/src/webhooks.rs | 34 ++++- .../meteroid/crates/meteroid-store/Cargo.toml | 3 + .../meteroid-store/src/domain/configs.rs | 14 +- .../crates/meteroid-store/src/domain/misc.rs | 10 ++ .../crates/meteroid-store/src/domain/mod.rs | 1 + .../meteroid-store/src/domain/webhooks.rs | 97 ++++++++++++++ .../src/repositories/configs.rs | 2 +- .../meteroid-store/src/repositories/mod.rs | 1 + .../src/repositories/webhooks.rs | 91 +++++++++++++ .../crates/meteroid-store/src/utils/gen.rs | 16 +++ .../crates/meteroid-store/src/utils/mod.rs | 1 + modules/meteroid/src/api/server.rs | 5 +- modules/meteroid/src/api/utils.rs | 16 --- modules/meteroid/src/api/webhooksout/error.rs | 12 ++ .../meteroid/src/api/webhooksout/mapping.rs | 93 +++++++------ modules/meteroid/src/api/webhooksout/mod.rs | 26 +--- .../meteroid/src/api/webhooksout/service.rs | 120 ++++++----------- .../tests/integration/test_webhooks_out.rs | 20 ++- 23 files changed, 515 insertions(+), 179 deletions(-) create mode 100644 modules/meteroid/crates/diesel-models/src/extend/order.rs create mode 100644 modules/meteroid/crates/diesel-models/src/query/webhooks.rs create mode 100644 modules/meteroid/crates/meteroid-store/src/domain/webhooks.rs create mode 100644 modules/meteroid/crates/meteroid-store/src/repositories/webhooks.rs create mode 100644 modules/meteroid/crates/meteroid-store/src/utils/gen.rs diff --git a/modules/meteroid/crates/diesel-models/src/extend/mod.rs b/modules/meteroid/crates/diesel-models/src/extend/mod.rs index f91b3551..7ff11c0e 100644 --- a/modules/meteroid/crates/diesel-models/src/extend/mod.rs +++ b/modules/meteroid/crates/diesel-models/src/extend/mod.rs @@ -1,2 +1,3 @@ pub mod cursor_pagination; +pub mod order; pub mod pagination; diff --git a/modules/meteroid/crates/diesel-models/src/extend/order.rs b/modules/meteroid/crates/diesel-models/src/extend/order.rs new file mode 100644 index 00000000..e29efc32 --- /dev/null +++ b/modules/meteroid/crates/diesel-models/src/extend/order.rs @@ -0,0 +1,6 @@ +pub enum OrderByRequest { + IdAsc, + IdDesc, + DateAsc, + DateDesc, +} diff --git a/modules/meteroid/crates/diesel-models/src/query/mod.rs b/modules/meteroid/crates/diesel-models/src/query/mod.rs index ea5d4c1e..4721b567 100644 --- a/modules/meteroid/crates/diesel-models/src/query/mod.rs +++ b/modules/meteroid/crates/diesel-models/src/query/mod.rs @@ -18,3 +18,4 @@ pub mod subscription_events; pub mod subscriptions; pub mod tenants; pub mod users; +pub mod webhooks; diff --git a/modules/meteroid/crates/diesel-models/src/query/webhooks.rs b/modules/meteroid/crates/diesel-models/src/query/webhooks.rs new file mode 100644 index 00000000..46606d3a --- /dev/null +++ b/modules/meteroid/crates/diesel-models/src/query/webhooks.rs @@ -0,0 +1,122 @@ +use crate::errors::IntoDbResult; +use crate::extend::order::OrderByRequest; +use crate::extend::pagination::{Paginate, PaginatedVec, PaginationRequest}; +use crate::webhooks::{ + WebhookOutEndpoint, WebhookOutEndpointNew, WebhookOutEvent, WebhookOutEventNew, +}; +use crate::{DbResult, PgConn}; +use diesel::{debug_query, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; +use error_stack::ResultExt; + +impl WebhookOutEndpointNew { + pub async fn insert(&self, conn: &mut PgConn) -> DbResult { + use crate::schema::webhook_out_endpoint::dsl::*; + use diesel_async::RunQueryDsl; + + let query = diesel::insert_into(webhook_out_endpoint).values(self); + log::debug!("{}", debug_query::(&query).to_string()); + + query + .get_result(conn) + .await + .attach_printable("Error while inserting webhook_out_endpoint") + .into_db_result() + } +} + +impl WebhookOutEventNew { + pub async fn insert(&self, conn: &mut PgConn) -> DbResult { + use crate::schema::webhook_out_event::dsl::*; + use diesel_async::RunQueryDsl; + + let query = diesel::insert_into(webhook_out_event).values(self); + log::debug!("{}", debug_query::(&query).to_string()); + + query + .get_result(conn) + .await + .attach_printable("Error while inserting webhook_out_event") + .into_db_result() + } +} + +impl WebhookOutEndpoint { + pub async fn list_by_tenant_id( + conn: &mut PgConn, + tenant_id: uuid::Uuid, + ) -> DbResult> { + use crate::schema::webhook_out_endpoint::dsl; + use diesel_async::RunQueryDsl; + + let query = dsl::webhook_out_endpoint.filter(dsl::tenant_id.eq(tenant_id)); + + log::debug!("{}", debug_query::(&query).to_string()); + + query + .get_results(conn) + .await + .attach_printable("Error while listing webhook_out_endpoints by tenant_id") + .into_db_result() + } + + pub async fn find_by_id_and_tenant_id( + conn: &mut PgConn, + id: uuid::Uuid, + tenant_id: uuid::Uuid, + ) -> DbResult { + use crate::schema::webhook_out_endpoint::dsl; + use diesel_async::RunQueryDsl; + + let query = dsl::webhook_out_endpoint + .filter(dsl::tenant_id.eq(tenant_id)) + .filter(dsl::id.eq(id)); + + log::debug!("{}", debug_query::(&query).to_string()); + + query + .first(conn) + .await + .attach_printable("Error while fetching webhook_out_endpoint by id and tenant_id") + .into_db_result() + } +} + +impl WebhookOutEvent { + pub async fn list_events( + conn: &mut PgConn, + tenant_id: uuid::Uuid, + endpoint_id: uuid::Uuid, + pagination: PaginationRequest, + order_by: OrderByRequest, + ) -> DbResult> { + use crate::schema::webhook_out_endpoint::dsl as end_dsl; + use crate::schema::webhook_out_event::dsl as ev_dsl; + + let mut query = ev_dsl::webhook_out_event + .inner_join(end_dsl::webhook_out_endpoint.on(ev_dsl::endpoint_id.eq(end_dsl::id))) + .filter(ev_dsl::endpoint_id.eq(endpoint_id)) + .filter(end_dsl::tenant_id.eq(tenant_id)) + .select(WebhookOutEvent::as_select()) + .into_boxed(); + + match order_by { + OrderByRequest::IdAsc => query = query.order(ev_dsl::id.asc()), + OrderByRequest::IdDesc => query = query.order(ev_dsl::id.desc()), + OrderByRequest::DateAsc => query = query.order(ev_dsl::created_at.asc()), + OrderByRequest::DateDesc => query = query.order(ev_dsl::created_at.desc()), + } + + let paginated_query = query.paginate(pagination); + + log::debug!( + "{}", + debug_query::(&paginated_query).to_string() + ); + + paginated_query + .load_and_count_pages(conn) + .await + .attach_printable("Error while fetching webhook_out events") + .into_db_result() + } +} diff --git a/modules/meteroid/crates/diesel-models/src/schema.rs b/modules/meteroid/crates/diesel-models/src/schema.rs index 02779b84..910e2327 100644 --- a/modules/meteroid/crates/diesel-models/src/schema.rs +++ b/modules/meteroid/crates/diesel-models/src/schema.rs @@ -570,7 +570,7 @@ diesel::table! { description -> Nullable, secret -> Text, created_at -> Timestamp, - events_to_listen -> Array>, + events_to_listen -> Array, enabled -> Bool, } } diff --git a/modules/meteroid/crates/diesel-models/src/webhooks.rs b/modules/meteroid/crates/diesel-models/src/webhooks.rs index 7d32980e..98e83937 100644 --- a/modules/meteroid/crates/diesel-models/src/webhooks.rs +++ b/modules/meteroid/crates/diesel-models/src/webhooks.rs @@ -3,7 +3,7 @@ use chrono::offset::Utc; use chrono::DateTime; use chrono::NaiveDateTime; -use diesel::Queryable; +use diesel::{Identifiable, Insertable, Queryable, Selectable}; use uuid::Uuid; #[derive(Queryable, Debug)] @@ -20,7 +20,7 @@ pub struct WebhookInEvent { pub provider_config_id: Uuid, } -#[derive(Queryable, Debug)] +#[derive(Queryable, Identifiable, Debug)] #[diesel(table_name = crate::schema::webhook_out_endpoint)] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct WebhookOutEndpoint { @@ -30,11 +30,24 @@ pub struct WebhookOutEndpoint { pub description: Option, pub secret: String, pub created_at: NaiveDateTime, - pub events_to_listen: Vec>, + pub events_to_listen: Vec, pub enabled: bool, } -#[derive(Queryable, Debug)] +#[derive(Insertable, Debug)] +#[diesel(table_name = crate::schema::webhook_out_endpoint)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct WebhookOutEndpointNew { + pub id: Uuid, + pub tenant_id: Uuid, + pub url: String, + pub description: Option, + pub secret: String, + pub events_to_listen: Vec, + pub enabled: bool, +} + +#[derive(Queryable, Identifiable, Debug, Selectable)] #[diesel(table_name = crate::schema::webhook_out_event)] pub struct WebhookOutEvent { pub id: Uuid, @@ -46,3 +59,16 @@ pub struct WebhookOutEvent { pub http_status_code: Option, pub error_message: Option, } + +#[derive(Insertable, Debug)] +#[diesel(table_name = crate::schema::webhook_out_event)] +pub struct WebhookOutEventNew { + pub id: Uuid, + pub endpoint_id: Uuid, + pub created_at: NaiveDateTime, + pub event_type: WebhookOutEventTypeEnum, + pub request_body: String, + pub response_body: Option, + pub http_status_code: Option, + pub error_message: Option, +} diff --git a/modules/meteroid/crates/meteroid-store/Cargo.toml b/modules/meteroid/crates/meteroid-store/Cargo.toml index 2d1ec54e..1237464e 100644 --- a/modules/meteroid/crates/meteroid-store/Cargo.toml +++ b/modules/meteroid/crates/meteroid-store/Cargo.toml @@ -34,6 +34,9 @@ itertools.workspace = true secrecy = { workspace = true, features = ["default", "serde"] } chacha20poly1305 = { workspace = true } hex = { workspace = true } +url = { workspace = true } +base64 = { workspace = true } +fastrand = { workspace = true } [dev-dependencies] rstest = { workspace = true } diff --git a/modules/meteroid/crates/meteroid-store/src/domain/configs.rs b/modules/meteroid/crates/meteroid-store/src/domain/configs.rs index 587035e2..742c2fc3 100644 --- a/modules/meteroid/crates/meteroid-store/src/domain/configs.rs +++ b/modules/meteroid/crates/meteroid-store/src/domain/configs.rs @@ -82,19 +82,19 @@ pub struct ProviderConfigNew { } impl ProviderConfigNew { - pub fn domain_to_row( + pub fn to_row( + &self, key: &SecretString, - domain: &ProviderConfigNew, ) -> StoreResult { let wh_sec_enc = WebhookSecurity { - secret: crate::crypt::encrypt(key, domain.webhook_security.secret.as_str()) + secret: crate::crypt::encrypt(key, self.webhook_security.secret.as_str()) .change_context(StoreError::CryptError( "webhook_security encryption error".into(), ))?, }; let api_sec_enc = ApiSecurity { - api_key: crate::crypt::encrypt(key, domain.api_security.api_key.as_str()) + api_key: crate::crypt::encrypt(key, self.api_security.api_key.as_str()) .change_context(StoreError::CryptError( "api_security encryption error".into(), ))?, @@ -110,9 +110,9 @@ impl ProviderConfigNew { Ok(diesel_models::configs::ProviderConfigNew { id: Uuid::now_v7(), - tenant_id: domain.tenant_id, - invoicing_provider: domain.invoicing_provider.clone().into(), - enabled: domain.enabled, + tenant_id: self.tenant_id, + invoicing_provider: self.invoicing_provider.clone().into(), + enabled: self.enabled, webhook_security: wh_sec, api_security: api_sec, }) diff --git a/modules/meteroid/crates/meteroid-store/src/domain/misc.rs b/modules/meteroid/crates/meteroid-store/src/domain/misc.rs index fbde8a54..7cdcd92a 100644 --- a/modules/meteroid/crates/meteroid-store/src/domain/misc.rs +++ b/modules/meteroid/crates/meteroid-store/src/domain/misc.rs @@ -1,4 +1,5 @@ use chrono::NaiveDate; +use o2o::o2o; use uuid::Uuid; pub struct PaginationRequest { @@ -103,3 +104,12 @@ pub struct ComponentPeriods { pub advance: Option, pub proration_factor: Option, } + +#[derive(Debug, Clone, o2o)] +#[map_owned(diesel_models::extend::order::OrderByRequest)] +pub enum OrderByRequest { + IdAsc, + IdDesc, + DateAsc, + DateDesc, +} diff --git a/modules/meteroid/crates/meteroid-store/src/domain/mod.rs b/modules/meteroid/crates/meteroid-store/src/domain/mod.rs index 127035a3..4ef79899 100644 --- a/modules/meteroid/crates/meteroid-store/src/domain/mod.rs +++ b/modules/meteroid/crates/meteroid-store/src/domain/mod.rs @@ -27,3 +27,4 @@ pub mod product_families; pub mod schedules; pub mod subscription_components; pub mod subscriptions; +pub mod webhooks; diff --git a/modules/meteroid/crates/meteroid-store/src/domain/webhooks.rs b/modules/meteroid/crates/meteroid-store/src/domain/webhooks.rs new file mode 100644 index 00000000..a7ce2138 --- /dev/null +++ b/modules/meteroid/crates/meteroid-store/src/domain/webhooks.rs @@ -0,0 +1,97 @@ +use crate::domain::enums::WebhookOutEventTypeEnum; +use crate::errors::StoreError; +use crate::utils::gen::webhook_security; +use crate::StoreResult; +use chrono::NaiveDateTime; +use error_stack::ResultExt; +use itertools::Itertools; +use o2o::o2o; +use secrecy::{ExposeSecret, SecretString}; +use url::Url; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub struct WebhookOutEndpoint { + pub id: Uuid, + pub tenant_id: Uuid, + pub url: Url, + pub description: Option, + pub secret: SecretString, + pub created_at: NaiveDateTime, + pub events_to_listen: Vec, + pub enabled: bool, +} + +impl WebhookOutEndpoint { + pub fn from_row( + key: &SecretString, + row: diesel_models::webhooks::WebhookOutEndpoint, + ) -> StoreResult { + let dec_sec = crate::crypt::decrypt(key, row.secret.as_str()) + .change_context(StoreError::CryptError("secret decryption error".into()))?; + + let dec_url = Url::parse(row.url.as_str()) + .change_context(StoreError::InvalidArgument("invalid url value".into()))?; + + Ok(WebhookOutEndpoint { + id: row.id, + tenant_id: row.tenant_id, + url: dec_url, + description: row.description, + secret: dec_sec, + created_at: row.created_at, + events_to_listen: row.events_to_listen.into_iter().map_into().collect(), + enabled: row.enabled, + }) + } +} + +#[derive(Clone, Debug)] +pub struct WebhookOutEndpointNew { + pub tenant_id: Uuid, + pub url: Url, + pub description: Option, + pub events_to_listen: Vec, + pub enabled: bool, +} + +impl WebhookOutEndpointNew { + pub fn to_row( + &self, + key: &SecretString, + ) -> StoreResult { + let enc_secret = + crate::crypt::encrypt(key, webhook_security::gen().expose_secret().as_str()) + .change_context(StoreError::CryptError("secret decryption error".into()))?; + + Ok(diesel_models::webhooks::WebhookOutEndpointNew { + id: Uuid::now_v7(), + tenant_id: self.tenant_id, + url: self.url.to_string(), + description: self.description.clone(), + secret: enc_secret, + events_to_listen: self + .events_to_listen + .clone() + .into_iter() + .map_into() + .collect(), + enabled: self.enabled, + }) + } +} + +#[derive(Clone, Debug, o2o)] +#[from_owned(diesel_models::webhooks::WebhookOutEvent)] +#[owned_into(diesel_models::webhooks::WebhookOutEvent)] +pub struct WebhookOutEvent { + pub id: Uuid, + pub endpoint_id: Uuid, + pub created_at: NaiveDateTime, + #[map(~.into())] + pub event_type: WebhookOutEventTypeEnum, + pub request_body: String, + pub response_body: Option, + pub http_status_code: Option, + pub error_message: Option, +} diff --git a/modules/meteroid/crates/meteroid-store/src/repositories/configs.rs b/modules/meteroid/crates/meteroid-store/src/repositories/configs.rs index e1f49949..790ab3d2 100644 --- a/modules/meteroid/crates/meteroid-store/src/repositories/configs.rs +++ b/modules/meteroid/crates/meteroid-store/src/repositories/configs.rs @@ -26,7 +26,7 @@ impl ConfigsInterface for Store { &self, config: ProviderConfigNew, ) -> StoreResult { - let insertable = ProviderConfigNew::domain_to_row(&self.crypt_key, &config)?; + let insertable = config.to_row(&self.crypt_key)?; let mut conn = self.get_conn().await?; diff --git a/modules/meteroid/crates/meteroid-store/src/repositories/mod.rs b/modules/meteroid/crates/meteroid-store/src/repositories/mod.rs index 9843343e..c01d659b 100644 --- a/modules/meteroid/crates/meteroid-store/src/repositories/mod.rs +++ b/modules/meteroid/crates/meteroid-store/src/repositories/mod.rs @@ -15,3 +15,4 @@ pub mod configs; pub mod price_components; pub mod product_families; pub mod subscriptions; +pub mod webhooks; diff --git a/modules/meteroid/crates/meteroid-store/src/repositories/webhooks.rs b/modules/meteroid/crates/meteroid-store/src/repositories/webhooks.rs new file mode 100644 index 00000000..b07b05ab --- /dev/null +++ b/modules/meteroid/crates/meteroid-store/src/repositories/webhooks.rs @@ -0,0 +1,91 @@ +use crate::domain::webhooks::{WebhookOutEndpoint, WebhookOutEndpointNew, WebhookOutEvent}; +use crate::domain::{OrderByRequest, PaginatedVec, PaginationRequest}; +use crate::errors::StoreError; +use crate::{Store, StoreResult}; +use error_stack::Report; +use uuid::Uuid; + +#[async_trait::async_trait] +pub trait WebhooksInterface { + async fn insert_webhook_out_endpoint( + &self, + endpoint: WebhookOutEndpointNew, + ) -> StoreResult; + + async fn list_webhook_out_endpoints( + &self, + tenant_id: Uuid, + ) -> StoreResult>; + + async fn list_webhook_out_events( + &self, + tenant_id: Uuid, + endpoint_id: Uuid, + pagination: PaginationRequest, + order_by: OrderByRequest, + ) -> StoreResult>; +} + +#[async_trait::async_trait] +impl WebhooksInterface for Store { + async fn insert_webhook_out_endpoint( + &self, + endpoint: WebhookOutEndpointNew, + ) -> StoreResult { + let insertable = endpoint.to_row(&self.crypt_key)?; + + let mut conn = self.get_conn().await?; + + let row = insertable + .insert(&mut conn) + .await + .map_err(Into::>::into)?; + + WebhookOutEndpoint::from_row(&self.crypt_key, row) + } + + async fn list_webhook_out_endpoints( + &self, + tenant_id: Uuid, + ) -> StoreResult> { + let mut conn = self.get_conn().await?; + + let vec_rows = + diesel_models::webhooks::WebhookOutEndpoint::list_by_tenant_id(&mut conn, tenant_id) + .await + .map_err(Into::>::into)?; + + vec_rows + .into_iter() + .map(|row| WebhookOutEndpoint::from_row(&self.crypt_key, row)) + .collect() + } + + async fn list_webhook_out_events( + &self, + tenant_id: Uuid, + endpoint_id: Uuid, + pagination: PaginationRequest, + order_by: OrderByRequest, + ) -> StoreResult> { + let mut conn = self.get_conn().await?; + + let rows = diesel_models::webhooks::WebhookOutEvent::list_events( + &mut conn, + tenant_id, + endpoint_id, + pagination.into(), + order_by.into(), + ) + .await + .map_err(Into::>::into)?; + + let res: PaginatedVec = PaginatedVec { + items: rows.items.into_iter().map(|s| s.into()).collect(), + total_pages: rows.total_pages, + total_results: rows.total_results, + }; + + Ok(res) + } +} diff --git a/modules/meteroid/crates/meteroid-store/src/utils/gen.rs b/modules/meteroid/crates/meteroid-store/src/utils/gen.rs new file mode 100644 index 00000000..f7e42574 --- /dev/null +++ b/modules/meteroid/crates/meteroid-store/src/utils/gen.rs @@ -0,0 +1,16 @@ +pub mod webhook_security { + use base64::Engine; + use secrecy::SecretString; + + const KEY_SIZE: usize = 24; + const PREFIX: &str = "whsec_"; + + pub fn gen() -> SecretString { + let key: Vec = std::iter::repeat_with(|| fastrand::u8(..)) + .take(KEY_SIZE) + .collect(); + let encoded = base64::prelude::BASE64_STANDARD.encode(&key); + + SecretString::new(format!("{}{}", PREFIX, encoded)) + } +} diff --git a/modules/meteroid/crates/meteroid-store/src/utils/mod.rs b/modules/meteroid/crates/meteroid-store/src/utils/mod.rs index 508c82e2..ac851d91 100644 --- a/modules/meteroid/crates/meteroid-store/src/utils/mod.rs +++ b/modules/meteroid/crates/meteroid-store/src/utils/mod.rs @@ -1 +1,2 @@ +pub mod gen; pub mod periods; diff --git a/modules/meteroid/src/api/server.rs b/modules/meteroid/src/api/server.rs index f13014da..2e409548 100644 --- a/modules/meteroid/src/api/server.rs +++ b/modules/meteroid/src/api/server.rs @@ -139,10 +139,7 @@ pub async fn start_api_server( compute_service, eventbus.clone(), )) - .add_service(api::webhooksout::service( - pool.clone(), - config.secrets_crypt_key.clone(), - )) + .add_service(api::webhooksout::service(store.clone())) .add_service(api::internal::service(pool.clone())) .serve(config.listen_addr) .await?; diff --git a/modules/meteroid/src/api/utils.rs b/modules/meteroid/src/api/utils.rs index 26bacdbf..990ca546 100644 --- a/modules/meteroid/src/api/utils.rs +++ b/modules/meteroid/src/api/utils.rs @@ -39,22 +39,6 @@ pub fn parse_uuid_opt( } } -pub mod webhook_security { - use base64::Engine; - - const KEY_SIZE: usize = 24; - const PREFIX: &str = "whsec_"; - - pub fn gen() -> String { - let key: Vec = std::iter::repeat_with(|| fastrand::u8(..)) - .take(KEY_SIZE) - .collect(); - let encoded = base64::prelude::BASE64_STANDARD.encode(&key); - - format!("{}{}", PREFIX, encoded) - } -} - pub trait PaginationExt { fn limit(&self) -> i64; fn limit_or(&self, default: u32) -> i64; diff --git a/modules/meteroid/src/api/webhooksout/error.rs b/modules/meteroid/src/api/webhooksout/error.rs index 72a471e6..c9bd3d6f 100644 --- a/modules/meteroid/src/api/webhooksout/error.rs +++ b/modules/meteroid/src/api/webhooksout/error.rs @@ -1,4 +1,5 @@ use deadpool_postgres::tokio_postgres; +use std::error::Error; use thiserror::Error; use common_grpc_error_as_tonic_macros_impl::ErrorAsTonic; @@ -20,4 +21,15 @@ pub enum WebhookApiError { #[error("Database error: {0}")] #[code(Internal)] DatabaseError(String, #[source] tokio_postgres::Error), + + #[error("Store error: {0}")] + #[code(Internal)] + StoreError(String, #[source] Box), +} + +impl Into for error_stack::Report { + fn into(self) -> WebhookApiError { + let err = Box::new(self.into_error()); + WebhookApiError::StoreError("Error in tenant service".to_string(), err) + } } diff --git a/modules/meteroid/src/api/webhooksout/mapping.rs b/modules/meteroid/src/api/webhooksout/mapping.rs index 1540d23f..58a97eab 100644 --- a/modules/meteroid/src/api/webhooksout/mapping.rs +++ b/modules/meteroid/src/api/webhooksout/mapping.rs @@ -1,74 +1,91 @@ pub mod endpoint { - use crate::api::shared::mapping::datetime::datetime_to_timestamp; + use crate::api::shared::mapping::datetime::chrono_to_timestamp; + use crate::api::webhooksout::error::WebhookApiError; use crate::api::webhooksout::mapping::event_type; - use meteroid_grpc::meteroid::api::webhooks::out::v1::WebhookEndpoint as WebhookEndpointProto; - use meteroid_repository::webhook_out_endpoints::WebhookOutEndpoint as WebhookEndpointDb; - use meteroid_store::crypt; - use secrecy::{ExposeSecret, SecretString}; - use tonic::Status; + use meteroid_grpc::meteroid::api::webhooks::out::v1::{ + CreateWebhookEndpointRequest, WebhookEndpoint as WebhookEndpointProto, + }; + use meteroid_store::domain::enums::WebhookOutEventTypeEnum; + use meteroid_store::domain::webhooks::{WebhookOutEndpoint, WebhookOutEndpointNew}; + use secrecy::ExposeSecret; + use uuid::Uuid; - pub fn to_proto( - endpoint: &WebhookEndpointDb, - crypt_key: &SecretString, - ) -> Result { - let secret = crypt::decrypt(crypt_key, endpoint.secret.as_str()).map_err(|x| { - Status::new( - tonic::Code::Internal, - x.current_context().clone().to_string(), - ) - })?; - - let endpoint = WebhookEndpointProto { + pub fn to_proto(endpoint: WebhookOutEndpoint) -> WebhookEndpointProto { + WebhookEndpointProto { id: endpoint.id.to_string(), - url: endpoint.url.clone(), + url: endpoint.url.to_string(), description: endpoint.description.clone(), - secret: secret.expose_secret().to_string(), + secret: endpoint.secret.expose_secret().to_string(), events_to_listen: endpoint .events_to_listen .iter() .map(|e| event_type::to_proto(&e).into()) .collect(), enabled: endpoint.enabled, - created_at: Some(datetime_to_timestamp(endpoint.created_at)), - }; + created_at: Some(chrono_to_timestamp(endpoint.created_at)), + } + } - Ok(endpoint) + pub fn new_req_to_domain( + tenant_id: Uuid, + req: CreateWebhookEndpointRequest, + ) -> Result { + let url = url::Url::parse(req.url.as_str()) + .map_err(|e| WebhookApiError::InvalidArgument(format!("Invalid URL: {}", e)))?; + + let events_to_listen: Vec = req + .events_to_listen() + .map(|e| event_type::to_domain(&e)) + .collect(); + + Ok(WebhookOutEndpointNew { + tenant_id, + url, + description: req.description, + events_to_listen, + enabled: true, + }) } } pub mod event_type { use meteroid_grpc::meteroid::api::webhooks::out::v1::WebhookEventType as WebhookEventTypeProto; - use meteroid_repository::WebhookOutEventTypeEnum as WebhookEventTypeDb; - pub fn to_db(event_type: &WebhookEventTypeProto) -> WebhookEventTypeDb { + use meteroid_store::domain::enums::WebhookOutEventTypeEnum; + + pub fn to_domain(event_type: &WebhookEventTypeProto) -> WebhookOutEventTypeEnum { match event_type { - WebhookEventTypeProto::CustomerCreated => WebhookEventTypeDb::CUSTOMER_CREATED, - WebhookEventTypeProto::SubscriptionCreated => WebhookEventTypeDb::SUBSCRIPTION_CREATED, - WebhookEventTypeProto::InvoiceCreated => WebhookEventTypeDb::INVOICE_CREATED, - WebhookEventTypeProto::InvoiceFinalized => WebhookEventTypeDb::INVOICE_FINALIZED, + WebhookEventTypeProto::CustomerCreated => WebhookOutEventTypeEnum::CustomerCreated, + WebhookEventTypeProto::SubscriptionCreated => { + WebhookOutEventTypeEnum::SubscriptionCreated + } + WebhookEventTypeProto::InvoiceCreated => WebhookOutEventTypeEnum::InvoiceCreated, + WebhookEventTypeProto::InvoiceFinalized => WebhookOutEventTypeEnum::InvoiceFinalized, } } - pub fn to_proto(event_type: &WebhookEventTypeDb) -> WebhookEventTypeProto { + pub fn to_proto(event_type: &WebhookOutEventTypeEnum) -> WebhookEventTypeProto { match event_type { - WebhookEventTypeDb::CUSTOMER_CREATED => WebhookEventTypeProto::CustomerCreated, - WebhookEventTypeDb::SUBSCRIPTION_CREATED => WebhookEventTypeProto::SubscriptionCreated, - WebhookEventTypeDb::INVOICE_CREATED => WebhookEventTypeProto::InvoiceCreated, - WebhookEventTypeDb::INVOICE_FINALIZED => WebhookEventTypeProto::InvoiceFinalized, + WebhookOutEventTypeEnum::CustomerCreated => WebhookEventTypeProto::CustomerCreated, + WebhookOutEventTypeEnum::SubscriptionCreated => { + WebhookEventTypeProto::SubscriptionCreated + } + WebhookOutEventTypeEnum::InvoiceCreated => WebhookEventTypeProto::InvoiceCreated, + WebhookOutEventTypeEnum::InvoiceFinalized => WebhookEventTypeProto::InvoiceFinalized, } } } pub mod event { - use crate::api::shared::mapping::datetime::datetime_to_timestamp; + use crate::api::shared::mapping::datetime::chrono_to_timestamp; use crate::api::webhooksout::mapping::event_type; use meteroid_grpc::meteroid::api::webhooks::out::v1::WebhookEvent as WebhookEventProto; - use meteroid_repository::webhook_out_events::ListWebhookOutEvent as ListWebhookOutEventDb; + use meteroid_store::domain::webhooks::WebhookOutEvent; - pub fn to_proto(event: &ListWebhookOutEventDb) -> WebhookEventProto { + pub fn to_proto(event: &WebhookOutEvent) -> WebhookEventProto { WebhookEventProto { id: event.id.to_string(), event_type: event_type::to_proto(&event.event_type).into(), - created_at: Some(datetime_to_timestamp(event.created_at)), + created_at: Some(chrono_to_timestamp(event.created_at)), http_status_code: event.http_status_code.map(|x| x as i32), request_body: event.request_body.clone(), response_body: event.response_body.clone(), diff --git a/modules/meteroid/src/api/webhooksout/mod.rs b/modules/meteroid/src/api/webhooksout/mod.rs index eaeb5347..4b46426b 100644 --- a/modules/meteroid/src/api/webhooksout/mod.rs +++ b/modules/meteroid/src/api/webhooksout/mod.rs @@ -1,33 +1,15 @@ -use crate::db::{get_connection, get_transaction}; -use deadpool_postgres::{Object, Pool, Transaction}; use meteroid_grpc::meteroid::api::webhooks::out::v1::webhooks_service_server::WebhooksServiceServer; -use tonic::Status; +use meteroid_store::Store; mod error; mod mapping; mod service; pub struct WebhooksServiceComponents { - pub pool: Pool, - pub crypt_key: secrecy::SecretString, + pub store: Store, } -impl WebhooksServiceComponents { - pub async fn get_connection(&self) -> Result { - get_connection(&self.pool).await - } - pub async fn get_transaction<'a>( - &'a self, - client: &'a mut Object, - ) -> Result, Status> { - get_transaction(client).await - } -} - -pub fn service( - pool: Pool, - crypt_key: secrecy::SecretString, -) -> WebhooksServiceServer { - let inner = WebhooksServiceComponents { pool, crypt_key }; +pub fn service(store: Store) -> WebhooksServiceServer { + let inner = WebhooksServiceComponents { store }; WebhooksServiceServer::new(inner) } diff --git a/modules/meteroid/src/api/webhooksout/service.rs b/modules/meteroid/src/api/webhooksout/service.rs index 041ad833..d09916e4 100644 --- a/modules/meteroid/src/api/webhooksout/service.rs +++ b/modules/meteroid/src/api/webhooksout/service.rs @@ -1,4 +1,3 @@ -use cornucopia_async::Params; use tonic::{Request, Response, Status}; use common_grpc::middleware::server::auth::RequestExt; @@ -8,15 +7,14 @@ use meteroid_grpc::meteroid::api::webhooks::out::v1::{ CreateWebhookEndpointRequest, CreateWebhookEndpointResponse, ListWebhookEndpointsRequest, ListWebhookEndpointsResponse, ListWebhookEventsRequest, ListWebhookEventsResponse, }; -use meteroid_repository::webhook_out_endpoints::CreateEndpointParams; -use meteroid_repository::webhook_out_events::ListEventsParams; -use meteroid_repository::WebhookOutEventTypeEnum; -use meteroid_store::crypt; +use meteroid_store::domain; +use meteroid_store::domain::OrderByRequest; +use meteroid_store::repositories::webhooks::WebhooksInterface; use crate::api::utils::parse_uuid; -use crate::api::utils::{uuid_gen, webhook_security, PaginationExt}; +use crate::api::utils::PaginationExt; use crate::api::webhooksout::error::WebhookApiError; -use crate::api::webhooksout::mapping::{endpoint, event, event_type}; +use crate::api::webhooksout::mapping::{endpoint, event}; use crate::api::webhooksout::WebhooksServiceComponents; #[tonic::async_trait] @@ -30,40 +28,17 @@ impl WebhooksService for WebhooksServiceComponents { let req = request.into_inner(); - let event_types: Vec = req - .events_to_listen() - .map(|e| event_type::to_db(&e)) - .collect(); - - url::Url::parse(req.url.as_str()) - .map_err(|e| WebhookApiError::InvalidArgument(format!("Invalid URL: {}", e)))?; - - let secret_raw = webhook_security::gen(); - let secret = crypt::encrypt(&self.crypt_key, secret_raw.as_str()) - .map_err(|x| Status::internal(x.current_context().clone().to_string()))?; - - let params = CreateEndpointParams { - id: uuid_gen::v7(), - tenant_id, - url: req.url, - description: req.description, - secret, - events_to_listen: event_types, - enabled: true, - }; + let domain = endpoint::new_req_to_domain(tenant_id, req)?; - let connection = self.get_connection().await?; - - let created = meteroid_repository::webhook_out_endpoints::create_endpoint() - .params(&connection, ¶ms) - .one() + let endpoint = self + .store + .insert_webhook_out_endpoint(domain) .await - .map_err(|e| { - WebhookApiError::DatabaseError("unable to create webhook endpoint".to_string(), e) - })?; + .map(endpoint::to_proto) + .map_err(Into::::into)?; Ok(Response::new(CreateWebhookEndpointResponse { - endpoint: Some(endpoint::to_proto(&created, &self.crypt_key)?), + endpoint: Some(endpoint), })) } @@ -74,18 +49,14 @@ impl WebhooksService for WebhooksServiceComponents { ) -> Result, Status> { let tenant_id = request.tenant()?.clone(); - let connection = self.get_connection().await?; - - let items = meteroid_repository::webhook_out_endpoints::list_endpoints() - .bind(&connection, &tenant_id) - .all() + let items = self + .store + .list_webhook_out_endpoints(tenant_id) .await - .map_err(|e| { - WebhookApiError::DatabaseError("unable to list webhook endpoints".to_string(), e) - })? - .iter() - .map(|e| endpoint::to_proto(e, &self.crypt_key)) - .collect::>()?; + .map_err(Into::::into)? + .into_iter() + .map(endpoint::to_proto) + .collect(); Ok(Response::new(ListWebhookEndpointsResponse { endpoints: items, @@ -103,48 +74,29 @@ impl WebhooksService for WebhooksServiceComponents { let endpoint_id = parse_uuid(&req.endpoint_id, "endpoint_id")?; - let connection = self.get_connection().await?; + let pagination_req = domain::PaginationRequest { + page: req.pagination.as_ref().map(|p| p.offset).unwrap_or(0), + per_page: req.pagination.as_ref().map(|p| p.limit), + }; - // make sure the endpoint belongs to the tenant - meteroid_repository::webhook_out_endpoints::get_by_id_and_tenant() - .bind(&connection, &endpoint_id, &tenant_id) - .opt() - .await - .map_err(|e| { - WebhookApiError::DatabaseError("unable to get webhook endpoint".to_string(), e) - })? - .ok_or_else(|| { - WebhookApiError::DatabaseEntityNotFoundError( - "Webhook endpoint not found".to_string(), - ) - })?; - - let params = ListEventsParams { - endpoint_id, - order_by: match req.order_by.try_into() { - Ok(SortBy::DateAsc) => "DATE_ASC", - Ok(SortBy::DateDesc) => "DATE_DESC", - Ok(SortBy::IdAsc) => "ID_ASC", - Ok(SortBy::IdDesc) => "ID_DESC", - Err(_) => "DATE_DESC", - }, - limit: req.pagination.limit(), - offset: req.pagination.offset(), + let order_by = match req.order_by.try_into() { + Ok(SortBy::DateAsc) => OrderByRequest::DateAsc, + Ok(SortBy::DateDesc) => OrderByRequest::DateDesc, + Ok(SortBy::IdAsc) => OrderByRequest::IdAsc, + Ok(SortBy::IdDesc) => OrderByRequest::IdDesc, + Err(_) => OrderByRequest::DateDesc, }; - let items = meteroid_repository::webhook_out_events::list_events() - .params(&connection, ¶ms) - .all() + let res = self + .store + .list_webhook_out_events(tenant_id, endpoint_id, pagination_req, order_by) .await - .map_err(|e| { - WebhookApiError::DatabaseError("unable to list webhook events".to_string(), e) - })?; - - let total = items.first().map(|p| p.total_count).unwrap_or(0); + .map_err(Into::::into)?; let response = ListWebhookEventsResponse { - pagination_meta: req.pagination.into_response(total as u32), - events: items + pagination_meta: req.pagination.into_response(res.total_results as u32), + events: res + .items .into_iter() .map(|l| event::to_proto(&l)) .collect::>(), diff --git a/modules/meteroid/tests/integration/test_webhooks_out.rs b/modules/meteroid/tests/integration/test_webhooks_out.rs index d356e1d0..fb327310 100644 --- a/modules/meteroid/tests/integration/test_webhooks_out.rs +++ b/modules/meteroid/tests/integration/test_webhooks_out.rs @@ -42,7 +42,7 @@ async fn test_webhook_endpoint_out() { .webhooks_out .clone() .create_webhook_endpoint(api::webhooks::out::v1::CreateWebhookEndpointRequest { - url: "https://example.com".to_string(), + url: "https://example.com/".to_string(), description: Some("Test".to_string()), events_to_listen: events_to_listen.clone(), }) @@ -52,7 +52,7 @@ async fn test_webhook_endpoint_out() { .endpoint .unwrap(); - assert_eq!(created.url.as_str(), "https://example.com"); + assert_eq!(created.url.as_str(), "https://example.com/"); assert_eq!(created.description, Some("Test".to_string())); assert_eq!(created.events_to_listen, events_to_listen.clone()); assert!(created.enabled); @@ -70,6 +70,22 @@ async fn test_webhook_endpoint_out() { assert_eq!(listed.len(), 1); assert_eq!(listed[0], created); + + // events + let events = clients + .webhooks_out + .clone() + .list_webhook_events(api::webhooks::out::v1::ListWebhookEventsRequest { + order_by: api::webhooks::out::v1::list_webhook_events_request::SortBy::DateDesc as i32, + endpoint_id: created.id, + pagination: None, + }) + .await + .unwrap() + .into_inner() + .events; + assert_eq!(events.len(), 0); + // teardown meteroid_it::container::terminate_meteroid(setup.token, setup.join_handle).await }