Skip to content

Commit

Permalink
chore: migrate webhook_out to diesel (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhur authored Apr 24, 2024
1 parent b4ad915 commit 3c0b082
Show file tree
Hide file tree
Showing 23 changed files with 515 additions and 179 deletions.
1 change: 1 addition & 0 deletions modules/meteroid/crates/diesel-models/src/extend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod cursor_pagination;
pub mod order;
pub mod pagination;
6 changes: 6 additions & 0 deletions modules/meteroid/crates/diesel-models/src/extend/order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub enum OrderByRequest {
IdAsc,
IdDesc,
DateAsc,
DateDesc,
}
1 change: 1 addition & 0 deletions modules/meteroid/crates/diesel-models/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ pub mod subscription_events;
pub mod subscriptions;
pub mod tenants;
pub mod users;
pub mod webhooks;
122 changes: 122 additions & 0 deletions modules/meteroid/crates/diesel-models/src/query/webhooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::errors::IntoDbResult;
use crate::extend::order::OrderByRequest;
use crate::extend::pagination::{Paginate, PaginatedVec, PaginationRequest};
use crate::webhooks::{
WebhookOutEndpoint, WebhookOutEndpointNew, WebhookOutEvent, WebhookOutEventNew,
};
use crate::{DbResult, PgConn};
use diesel::{debug_query, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
use error_stack::ResultExt;

impl WebhookOutEndpointNew {
pub async fn insert(&self, conn: &mut PgConn) -> DbResult<WebhookOutEndpoint> {
use crate::schema::webhook_out_endpoint::dsl::*;
use diesel_async::RunQueryDsl;

let query = diesel::insert_into(webhook_out_endpoint).values(self);
log::debug!("{}", debug_query::<diesel::pg::Pg, _>(&query).to_string());

query
.get_result(conn)
.await
.attach_printable("Error while inserting webhook_out_endpoint")
.into_db_result()
}
}

impl WebhookOutEventNew {
pub async fn insert(&self, conn: &mut PgConn) -> DbResult<WebhookOutEvent> {
use crate::schema::webhook_out_event::dsl::*;
use diesel_async::RunQueryDsl;

let query = diesel::insert_into(webhook_out_event).values(self);
log::debug!("{}", debug_query::<diesel::pg::Pg, _>(&query).to_string());

query
.get_result(conn)
.await
.attach_printable("Error while inserting webhook_out_event")
.into_db_result()
}
}

impl WebhookOutEndpoint {
pub async fn list_by_tenant_id(
conn: &mut PgConn,
tenant_id: uuid::Uuid,
) -> DbResult<Vec<WebhookOutEndpoint>> {
use crate::schema::webhook_out_endpoint::dsl;
use diesel_async::RunQueryDsl;

let query = dsl::webhook_out_endpoint.filter(dsl::tenant_id.eq(tenant_id));

log::debug!("{}", debug_query::<diesel::pg::Pg, _>(&query).to_string());

query
.get_results(conn)
.await
.attach_printable("Error while listing webhook_out_endpoints by tenant_id")
.into_db_result()
}

pub async fn find_by_id_and_tenant_id(
conn: &mut PgConn,
id: uuid::Uuid,
tenant_id: uuid::Uuid,
) -> DbResult<WebhookOutEndpoint> {
use crate::schema::webhook_out_endpoint::dsl;
use diesel_async::RunQueryDsl;

let query = dsl::webhook_out_endpoint
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::id.eq(id));

log::debug!("{}", debug_query::<diesel::pg::Pg, _>(&query).to_string());

query
.first(conn)
.await
.attach_printable("Error while fetching webhook_out_endpoint by id and tenant_id")
.into_db_result()
}
}

impl WebhookOutEvent {
pub async fn list_events(
conn: &mut PgConn,
tenant_id: uuid::Uuid,
endpoint_id: uuid::Uuid,
pagination: PaginationRequest,
order_by: OrderByRequest,
) -> DbResult<PaginatedVec<WebhookOutEvent>> {
use crate::schema::webhook_out_endpoint::dsl as end_dsl;
use crate::schema::webhook_out_event::dsl as ev_dsl;

let mut query = ev_dsl::webhook_out_event
.inner_join(end_dsl::webhook_out_endpoint.on(ev_dsl::endpoint_id.eq(end_dsl::id)))
.filter(ev_dsl::endpoint_id.eq(endpoint_id))
.filter(end_dsl::tenant_id.eq(tenant_id))
.select(WebhookOutEvent::as_select())
.into_boxed();

match order_by {
OrderByRequest::IdAsc => query = query.order(ev_dsl::id.asc()),
OrderByRequest::IdDesc => query = query.order(ev_dsl::id.desc()),
OrderByRequest::DateAsc => query = query.order(ev_dsl::created_at.asc()),
OrderByRequest::DateDesc => query = query.order(ev_dsl::created_at.desc()),
}

let paginated_query = query.paginate(pagination);

log::debug!(
"{}",
debug_query::<diesel::pg::Pg, _>(&paginated_query).to_string()
);

paginated_query
.load_and_count_pages(conn)
.await
.attach_printable("Error while fetching webhook_out events")
.into_db_result()
}
}
2 changes: 1 addition & 1 deletion modules/meteroid/crates/diesel-models/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ diesel::table! {
description -> Nullable<Text>,
secret -> Text,
created_at -> Timestamp,
events_to_listen -> Array<Nullable<WebhookOutEventTypeEnum>>,
events_to_listen -> Array<WebhookOutEventTypeEnum>,
enabled -> Bool,
}
}
Expand Down
34 changes: 30 additions & 4 deletions modules/meteroid/crates/diesel-models/src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::offset::Utc;
use chrono::DateTime;
use chrono::NaiveDateTime;

use diesel::Queryable;
use diesel::{Identifiable, Insertable, Queryable, Selectable};
use uuid::Uuid;

#[derive(Queryable, Debug)]
Expand All @@ -20,7 +20,7 @@ pub struct WebhookInEvent {
pub provider_config_id: Uuid,
}

#[derive(Queryable, Debug)]
#[derive(Queryable, Identifiable, Debug)]
#[diesel(table_name = crate::schema::webhook_out_endpoint)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct WebhookOutEndpoint {
Expand All @@ -30,11 +30,24 @@ pub struct WebhookOutEndpoint {
pub description: Option<String>,
pub secret: String,
pub created_at: NaiveDateTime,
pub events_to_listen: Vec<Option<WebhookOutEventTypeEnum>>,
pub events_to_listen: Vec<WebhookOutEventTypeEnum>,
pub enabled: bool,
}

#[derive(Queryable, Debug)]
#[derive(Insertable, Debug)]
#[diesel(table_name = crate::schema::webhook_out_endpoint)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct WebhookOutEndpointNew {
pub id: Uuid,
pub tenant_id: Uuid,
pub url: String,
pub description: Option<String>,
pub secret: String,
pub events_to_listen: Vec<WebhookOutEventTypeEnum>,
pub enabled: bool,
}

#[derive(Queryable, Identifiable, Debug, Selectable)]
#[diesel(table_name = crate::schema::webhook_out_event)]
pub struct WebhookOutEvent {
pub id: Uuid,
Expand All @@ -46,3 +59,16 @@ pub struct WebhookOutEvent {
pub http_status_code: Option<i16>,
pub error_message: Option<String>,
}

#[derive(Insertable, Debug)]
#[diesel(table_name = crate::schema::webhook_out_event)]
pub struct WebhookOutEventNew {
pub id: Uuid,
pub endpoint_id: Uuid,
pub created_at: NaiveDateTime,
pub event_type: WebhookOutEventTypeEnum,
pub request_body: String,
pub response_body: Option<String>,
pub http_status_code: Option<i16>,
pub error_message: Option<String>,
}
3 changes: 3 additions & 0 deletions modules/meteroid/crates/meteroid-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ itertools.workspace = true
secrecy = { workspace = true, features = ["default", "serde"] }
chacha20poly1305 = { workspace = true }
hex = { workspace = true }
url = { workspace = true }
base64 = { workspace = true }
fastrand = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }
14 changes: 7 additions & 7 deletions modules/meteroid/crates/meteroid-store/src/domain/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,19 @@ pub struct ProviderConfigNew {
}

impl ProviderConfigNew {
pub fn domain_to_row(
pub fn to_row(
&self,
key: &SecretString,
domain: &ProviderConfigNew,
) -> StoreResult<diesel_models::configs::ProviderConfigNew> {
let wh_sec_enc = WebhookSecurity {
secret: crate::crypt::encrypt(key, domain.webhook_security.secret.as_str())
secret: crate::crypt::encrypt(key, self.webhook_security.secret.as_str())
.change_context(StoreError::CryptError(
"webhook_security encryption error".into(),
))?,
};

let api_sec_enc = ApiSecurity {
api_key: crate::crypt::encrypt(key, domain.api_security.api_key.as_str())
api_key: crate::crypt::encrypt(key, self.api_security.api_key.as_str())
.change_context(StoreError::CryptError(
"api_security encryption error".into(),
))?,
Expand All @@ -110,9 +110,9 @@ impl ProviderConfigNew {

Ok(diesel_models::configs::ProviderConfigNew {
id: Uuid::now_v7(),
tenant_id: domain.tenant_id,
invoicing_provider: domain.invoicing_provider.clone().into(),
enabled: domain.enabled,
tenant_id: self.tenant_id,
invoicing_provider: self.invoicing_provider.clone().into(),
enabled: self.enabled,
webhook_security: wh_sec,
api_security: api_sec,
})
Expand Down
10 changes: 10 additions & 0 deletions modules/meteroid/crates/meteroid-store/src/domain/misc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::NaiveDate;
use o2o::o2o;
use uuid::Uuid;

pub struct PaginationRequest {
Expand Down Expand Up @@ -103,3 +104,12 @@ pub struct ComponentPeriods {
pub advance: Option<Period>,
pub proration_factor: Option<f64>,
}

#[derive(Debug, Clone, o2o)]
#[map_owned(diesel_models::extend::order::OrderByRequest)]
pub enum OrderByRequest {
IdAsc,
IdDesc,
DateAsc,
DateDesc,
}
1 change: 1 addition & 0 deletions modules/meteroid/crates/meteroid-store/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ pub mod product_families;
pub mod schedules;
pub mod subscription_components;
pub mod subscriptions;
pub mod webhooks;
97 changes: 97 additions & 0 deletions modules/meteroid/crates/meteroid-store/src/domain/webhooks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::domain::enums::WebhookOutEventTypeEnum;
use crate::errors::StoreError;
use crate::utils::gen::webhook_security;
use crate::StoreResult;
use chrono::NaiveDateTime;
use error_stack::ResultExt;
use itertools::Itertools;
use o2o::o2o;
use secrecy::{ExposeSecret, SecretString};
use url::Url;
use uuid::Uuid;

#[derive(Clone, Debug)]
pub struct WebhookOutEndpoint {
pub id: Uuid,
pub tenant_id: Uuid,
pub url: Url,
pub description: Option<String>,
pub secret: SecretString,
pub created_at: NaiveDateTime,
pub events_to_listen: Vec<WebhookOutEventTypeEnum>,
pub enabled: bool,
}

impl WebhookOutEndpoint {
pub fn from_row(
key: &SecretString,
row: diesel_models::webhooks::WebhookOutEndpoint,
) -> StoreResult<WebhookOutEndpoint> {
let dec_sec = crate::crypt::decrypt(key, row.secret.as_str())
.change_context(StoreError::CryptError("secret decryption error".into()))?;

let dec_url = Url::parse(row.url.as_str())
.change_context(StoreError::InvalidArgument("invalid url value".into()))?;

Ok(WebhookOutEndpoint {
id: row.id,
tenant_id: row.tenant_id,
url: dec_url,
description: row.description,
secret: dec_sec,
created_at: row.created_at,
events_to_listen: row.events_to_listen.into_iter().map_into().collect(),
enabled: row.enabled,
})
}
}

#[derive(Clone, Debug)]
pub struct WebhookOutEndpointNew {
pub tenant_id: Uuid,
pub url: Url,
pub description: Option<String>,
pub events_to_listen: Vec<WebhookOutEventTypeEnum>,
pub enabled: bool,
}

impl WebhookOutEndpointNew {
pub fn to_row(
&self,
key: &SecretString,
) -> StoreResult<diesel_models::webhooks::WebhookOutEndpointNew> {
let enc_secret =
crate::crypt::encrypt(key, webhook_security::gen().expose_secret().as_str())
.change_context(StoreError::CryptError("secret decryption error".into()))?;

Ok(diesel_models::webhooks::WebhookOutEndpointNew {
id: Uuid::now_v7(),
tenant_id: self.tenant_id,
url: self.url.to_string(),
description: self.description.clone(),
secret: enc_secret,
events_to_listen: self
.events_to_listen
.clone()
.into_iter()
.map_into()
.collect(),
enabled: self.enabled,
})
}
}

#[derive(Clone, Debug, o2o)]
#[from_owned(diesel_models::webhooks::WebhookOutEvent)]
#[owned_into(diesel_models::webhooks::WebhookOutEvent)]
pub struct WebhookOutEvent {
pub id: Uuid,
pub endpoint_id: Uuid,
pub created_at: NaiveDateTime,
#[map(~.into())]
pub event_type: WebhookOutEventTypeEnum,
pub request_body: String,
pub response_body: Option<String>,
pub http_status_code: Option<i16>,
pub error_message: Option<String>,
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ConfigsInterface for Store {
&self,
config: ProviderConfigNew,
) -> StoreResult<ProviderConfig> {
let insertable = ProviderConfigNew::domain_to_row(&self.crypt_key, &config)?;
let insertable = config.to_row(&self.crypt_key)?;

let mut conn = self.get_conn().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub mod configs;
pub mod price_components;
pub mod product_families;
pub mod subscriptions;
pub mod webhooks;
Loading

0 comments on commit 3c0b082

Please sign in to comment.