From 67fc85e6a2e4a1e29a4cdc838b973fa4aa4c2dc5 Mon Sep 17 00:00:00 2001 From: Vitaliy Morarian Date: Thu, 25 Apr 2024 19:19:20 +0300 Subject: [PATCH] refactor: relocation of eventbus to store (#189) --- Cargo.toml | 2 + crates/common-eventbus/Cargo.toml | 14 + crates/common-eventbus/src/lib.rs | 238 +++++++++++++ modules/meteroid/Cargo.toml | 1 + .../meteroid/crates/meteroid-store/Cargo.toml | 2 + .../src/repositories/api_tokens.rs | 14 +- .../crates/meteroid-store/src/store.rs | 15 +- modules/meteroid/src/api/apitokens/mod.rs | 12 +- modules/meteroid/src/api/apitokens/service.rs | 7 +- .../meteroid/src/api/billablemetrics/mod.rs | 2 +- .../src/api/billablemetrics/service.rs | 2 +- modules/meteroid/src/api/customers/mod.rs | 2 +- modules/meteroid/src/api/customers/service.rs | 2 +- modules/meteroid/src/api/instance/mod.rs | 2 +- modules/meteroid/src/api/instance/service.rs | 2 +- modules/meteroid/src/api/plans/mod.rs | 2 +- modules/meteroid/src/api/plans/service.rs | 2 +- .../meteroid/src/api/pricecomponents/mod.rs | 2 +- .../src/api/pricecomponents/service.rs | 2 +- .../meteroid/src/api/productfamilies/mod.rs | 2 +- .../src/api/productfamilies/service.rs | 2 +- modules/meteroid/src/api/server.rs | 30 +- modules/meteroid/src/api/subscriptions/mod.rs | 2 +- .../src/api/subscriptions/service_old.rs | 3 +- modules/meteroid/src/api/users/mod.rs | 2 +- modules/meteroid/src/api/users/service.rs | 2 +- modules/meteroid/src/bin/seeder.rs | 2 + modules/meteroid/src/bin/server.rs | 2 + .../src/eventbus/analytics_handler.rs | 5 +- modules/meteroid/src/eventbus/memory.rs | 9 +- modules/meteroid/src/eventbus/mod.rs | 321 +++--------------- .../eventbus.rs => src/eventbus/noop.rs} | 3 +- .../meteroid/src/eventbus/webhook_handler.rs | 20 +- modules/meteroid/src/singletons.rs | 33 +- .../src/workers/invoicing/draft_worker.rs | 13 +- .../src/workers/invoicing/finalize_worker.rs | 15 +- .../src/workers/invoicing/issue_worker.rs | 4 +- .../src/workers/invoicing/price_worker.rs | 8 +- modules/meteroid/tests/integration/e2e.rs | 31 +- .../integration/meteroid_it/container.rs | 2 + .../tests/integration/meteroid_it/mod.rs | 1 - .../tests/integration/test_webhooks_out.rs | 11 +- .../tests/integration/test_workers.rs | 36 +- 43 files changed, 470 insertions(+), 414 deletions(-) create mode 100644 crates/common-eventbus/Cargo.toml create mode 100644 crates/common-eventbus/src/lib.rs rename modules/meteroid/{tests/integration/meteroid_it/eventbus.rs => src/eventbus/noop.rs} (81%) diff --git a/Cargo.toml b/Cargo.toml index 69bb3242..761a8c64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/common-build-info", "crates/common-config", "crates/common-domain", + "crates/common-eventbus", "crates/common-grpc", "crates/common-grpc-error-as-tonic-macros", "crates/common-grpc-error-as-tonic-macros-impl", @@ -164,6 +165,7 @@ rand_chacha = "0.3.1" common-build-info = { path = "crates/common-build-info" } common-domain = { path = "crates/common-domain" } common-config = { path = "crates/common-config" } +common-eventbus = { path = "crates/common-eventbus" } common-logging = { path = "crates/common-logging" } common-repository = { path = "crates/common-repository" } common-grpc = { path = "crates/common-grpc" } diff --git a/crates/common-eventbus/Cargo.toml b/crates/common-eventbus/Cargo.toml new file mode 100644 index 00000000..7a40c3ef --- /dev/null +++ b/crates/common-eventbus/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "common-eventbus" +version = "0.1.0" +rust-version.workspace = true +edition.workspace = true +license.workspace = true +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait.workspace = true +thiserror.workspace = true +uuid = { workspace = true, features = ["v4", "v7"] } +chrono = { workspace = true, features = ["clock"] } + diff --git a/crates/common-eventbus/src/lib.rs b/crates/common-eventbus/src/lib.rs new file mode 100644 index 00000000..d12de252 --- /dev/null +++ b/crates/common-eventbus/src/lib.rs @@ -0,0 +1,238 @@ +use std::sync::Arc; + +use uuid::Uuid; + +#[async_trait::async_trait] +pub trait EventBus: Send + Sync { + async fn subscribe(&self, handler: Arc>); + async fn publish(&self, event: E) -> Result<(), EventBusError>; +} + +#[derive(thiserror::Error, Debug, Clone)] +pub enum EventBusError { + #[error("Failed to publish event")] + PublishFailed, + #[error("Failed to handle event: {0}")] + EventHandlerFailed(String), +} + +#[async_trait::async_trait] +pub trait EventHandler: Send + Sync { + async fn handle(&self, event: E) -> Result<(), EventBusError>; +} + +#[derive(Debug, Clone)] +pub struct Event { + pub event_id: Uuid, + pub event_timestamp: chrono::DateTime, + pub event_data: EventData, + pub actor: Option, +} + +impl Event { + pub fn new(event_data: EventData, actor: Option) -> Self { + Self { + event_id: Uuid::now_v7(), + event_timestamp: chrono::Utc::now(), + event_data, + actor, + } + } + + pub fn api_token_created(actor: Uuid, api_token_id: Uuid) -> Self { + Self::new( + EventData::ApiTokenCreated(EventDataDetails { + entity_id: api_token_id, + }), + Some(actor), + ) + } + + pub fn billable_metric_created(actor: Uuid, billable_metric_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::BillableMetricCreated(TenantEventDataDetails { + tenant_id, + entity_id: billable_metric_id, + }), + Some(actor), + ) + } + + pub fn customer_created(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::CustomerCreated(TenantEventDataDetails { + tenant_id, + entity_id: customer_id, + }), + Some(actor), + ) + } + pub fn customer_patched(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::CustomerPatched(TenantEventDataDetails { + tenant_id, + entity_id: customer_id, + }), + Some(actor), + ) + } + + pub fn instance_inited(actor: Uuid, organization_id: Uuid) -> Self { + Self::new( + EventData::InstanceInited(EventDataDetails { + entity_id: organization_id, + }), + Some(actor), + ) + } + + pub fn invoice_created(invoice_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::InvoiceCreated(TenantEventDataDetails { + tenant_id, + entity_id: invoice_id, + }), + None, + ) + } + + pub fn invoice_finalized(invoice_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::InvoiceFinalized(TenantEventDataDetails { + tenant_id, + entity_id: invoice_id, + }), + None, + ) + } + + pub fn plan_created_draft(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PlanCreatedDraft(TenantEventDataDetails { + tenant_id, + entity_id: plan_version_id, + }), + Some(actor), + ) + } + + pub fn plan_published_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PlanPublishedVersion(TenantEventDataDetails { + tenant_id, + entity_id: plan_version_id, + }), + Some(actor), + ) + } + + pub fn plan_discarded_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PlanDiscardedVersion(TenantEventDataDetails { + tenant_id, + entity_id: plan_version_id, + }), + Some(actor), + ) + } + + pub fn price_component_created(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PriceComponentCreated(TenantEventDataDetails { + tenant_id, + entity_id: price_component_id, + }), + Some(actor), + ) + } + + pub fn price_component_edited(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PriceComponentEdited(TenantEventDataDetails { + tenant_id, + entity_id: price_component_id, + }), + Some(actor), + ) + } + + pub fn price_component_removed(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::PriceComponentRemoved(TenantEventDataDetails { + tenant_id, + entity_id: price_component_id, + }), + Some(actor), + ) + } + + pub fn product_family_created(actor: Uuid, product_family_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::ProductFamilyCreated(TenantEventDataDetails { + tenant_id, + entity_id: product_family_id, + }), + Some(actor), + ) + } + + pub fn subscription_created(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::SubscriptionCreated(TenantEventDataDetails { + tenant_id, + entity_id: subscription_id, + }), + Some(actor), + ) + } + + pub fn subscription_canceled(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self { + Self::new( + EventData::SubscriptionCanceled(TenantEventDataDetails { + tenant_id, + entity_id: subscription_id, + }), + Some(actor), + ) + } + + pub fn user_created(actor: Option, user_id: Uuid) -> Self { + Self::new( + EventData::UserCreated(EventDataDetails { entity_id: user_id }), + actor, + ) + } +} + +#[derive(Debug, Clone)] +pub enum EventData { + ApiTokenCreated(EventDataDetails), + BillableMetricCreated(TenantEventDataDetails), + CustomerCreated(TenantEventDataDetails), + CustomerPatched(TenantEventDataDetails), + InstanceInited(EventDataDetails), + InvoiceCreated(TenantEventDataDetails), + InvoiceFinalized(TenantEventDataDetails), + PlanCreatedDraft(TenantEventDataDetails), + PlanPublishedVersion(TenantEventDataDetails), + PlanDiscardedVersion(TenantEventDataDetails), + PriceComponentCreated(TenantEventDataDetails), + PriceComponentEdited(TenantEventDataDetails), + PriceComponentRemoved(TenantEventDataDetails), + ProductFamilyCreated(TenantEventDataDetails), + SubscriptionCreated(TenantEventDataDetails), + SubscriptionCanceled(TenantEventDataDetails), + TenantCreated(TenantEventDataDetails), + UserCreated(EventDataDetails), +} + +#[derive(Debug, Clone)] +pub struct EventDataDetails { + pub entity_id: Uuid, +} + +#[derive(Debug, Clone)] +pub struct TenantEventDataDetails { + pub tenant_id: Uuid, + pub entity_id: Uuid, +} diff --git a/modules/meteroid/Cargo.toml b/modules/meteroid/Cargo.toml index d95edc94..81dddb8f 100644 --- a/modules/meteroid/Cargo.toml +++ b/modules/meteroid/Cargo.toml @@ -69,6 +69,7 @@ rust_decimal_macros = { workspace = true } metering-grpc = { workspace = true, features = ["client"] } common-domain = { workspace = true } +common-eventbus = { workspace = true } common-utils = { workspace = true, features = ["error-stack-conv"] } common-repository = { workspace = true } distributed-lock = { workspace = true, features = ["postgres-support"] } diff --git a/modules/meteroid/crates/meteroid-store/Cargo.toml b/modules/meteroid/crates/meteroid-store/Cargo.toml index 1237464e..ff85e0be 100644 --- a/modules/meteroid/crates/meteroid-store/Cargo.toml +++ b/modules/meteroid/crates/meteroid-store/Cargo.toml @@ -9,6 +9,8 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +common-eventbus.workspace = true + argon2 = { workspace = true } base62.workspace = true chrono.workspace = true diff --git a/modules/meteroid/crates/meteroid-store/src/repositories/api_tokens.rs b/modules/meteroid/crates/meteroid-store/src/repositories/api_tokens.rs index 41ad43fb..f78ba15b 100644 --- a/modules/meteroid/crates/meteroid-store/src/repositories/api_tokens.rs +++ b/modules/meteroid/crates/meteroid-store/src/repositories/api_tokens.rs @@ -2,6 +2,8 @@ use argon2::{ password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, Argon2, }; +use common_eventbus::{Event, EventBusError}; +use error_stack::Report; use nanoid::nanoid; use tracing_log::log; use uuid::Uuid; @@ -101,12 +103,22 @@ impl ApiTokensInterface for Store { hint: hint, }; - let result = insertable_entity + let result: Result> = insertable_entity .insert(&mut conn) .await .map_err(Into::into) .map(Into::into); + if result.is_ok() { + let _ = self + .eventbus + .publish(Event::api_token_created( + insertable_entity.created_by, + insertable_entity.id, + )) + .await; + } + result.map(|res| (api_key, res)) } } diff --git a/modules/meteroid/crates/meteroid-store/src/store.rs b/modules/meteroid/crates/meteroid-store/src/store.rs index e01ca01e..1ce13a03 100644 --- a/modules/meteroid/crates/meteroid-store/src/store.rs +++ b/modules/meteroid/crates/meteroid-store/src/store.rs @@ -1,8 +1,10 @@ use crate::errors::StoreError; use diesel_async::pooled_connection::deadpool::Object; use diesel_async::pooled_connection::deadpool::Pool; +use std::sync::Arc; use crate::StoreResult; +use common_eventbus::{Event, EventBus}; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::scoped_futures::{ScopedBoxFuture, ScopedFutureExt}; use diesel_async::{AsyncConnection, AsyncPgConnection}; @@ -15,6 +17,7 @@ pub type PgConn = Object; pub struct Store { pub pool: PgPool, pub crypt_key: secrecy::SecretString, + pub eventbus: Arc>, } pub fn diesel_make_pg_pool(db_url: String) -> StoreResult { @@ -29,10 +32,18 @@ pub fn diesel_make_pg_pool(db_url: String) -> StoreResult { } impl Store { - pub fn new(database_url: String, crypt_key: secrecy::SecretString) -> StoreResult { + pub fn new( + database_url: String, + crypt_key: secrecy::SecretString, + eventbus: Arc>, + ) -> StoreResult { let pool: PgPool = diesel_make_pg_pool(database_url)?; - Ok(Store { pool, crypt_key }) + Ok(Store { + pool, + crypt_key, + eventbus, + }) } pub async fn get_conn(&self) -> StoreResult { diff --git a/modules/meteroid/src/api/apitokens/mod.rs b/modules/meteroid/src/api/apitokens/mod.rs index 110ea821..ea130ac5 100644 --- a/modules/meteroid/src/api/apitokens/mod.rs +++ b/modules/meteroid/src/api/apitokens/mod.rs @@ -1,23 +1,15 @@ -use std::sync::Arc; - use meteroid_grpc::meteroid::api::apitokens::v1::api_tokens_service_server::ApiTokensServiceServer; use meteroid_store::Store; -use crate::eventbus::{Event, EventBus}; - mod error; mod mapping; mod service; pub struct ApiTokensServiceComponents { pub store: Store, - pub eventbus: Arc>, } -pub fn service( - store: Store, - eventbus: Arc>, -) -> ApiTokensServiceServer { - let inner = ApiTokensServiceComponents { store, eventbus }; +pub fn service(store: Store) -> ApiTokensServiceServer { + let inner = ApiTokensServiceComponents { store }; ApiTokensServiceServer::new(inner) } diff --git a/modules/meteroid/src/api/apitokens/service.rs b/modules/meteroid/src/api/apitokens/service.rs index e1049e2e..80d3ade8 100644 --- a/modules/meteroid/src/api/apitokens/service.rs +++ b/modules/meteroid/src/api/apitokens/service.rs @@ -1,3 +1,4 @@ +use common_eventbus::Event; use tonic::{Request, Response, Status}; use common_grpc::middleware::server::auth::RequestExt; @@ -9,7 +10,6 @@ use meteroid_store::domain; use meteroid_store::repositories::api_tokens::ApiTokensInterface; use crate::api::apitokens::error::ApiTokenApiError; -use crate::eventbus::Event; use crate::{api::utils::parse_uuid, parse_uuid}; use super::{mapping, ApiTokensServiceComponents}; @@ -66,11 +66,6 @@ impl ApiTokensService for ApiTokensServiceComponents { ) })?; - let _ = self - .eventbus - .publish(Event::api_token_created(actor, res.id)) - .await; - let response = CreateApiTokenResponse { api_key, details: Some(mapping::api_token::domain_to_api(res)), diff --git a/modules/meteroid/src/api/billablemetrics/mod.rs b/modules/meteroid/src/api/billablemetrics/mod.rs index 9726667b..3defc2e5 100644 --- a/modules/meteroid/src/api/billablemetrics/mod.rs +++ b/modules/meteroid/src/api/billablemetrics/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_eventbus::{Event, EventBus}; use deadpool_postgres::{Object, Transaction}; use tonic::Status; @@ -9,7 +10,6 @@ use meteroid_grpc::meteroid::api::billablemetrics::v1::billable_metrics_service_ use meteroid_repository::Pool; use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; mod error; pub mod mapping; diff --git a/modules/meteroid/src/api/billablemetrics/service.rs b/modules/meteroid/src/api/billablemetrics/service.rs index 0d59514e..942fe46f 100644 --- a/modules/meteroid/src/api/billablemetrics/service.rs +++ b/modules/meteroid/src/api/billablemetrics/service.rs @@ -1,3 +1,4 @@ +use common_eventbus::Event; use cornucopia_async::Params; use log::error; use tonic::{Request, Response, Status}; @@ -14,7 +15,6 @@ use meteroid_repository as db; use crate::api::billablemetrics::error::BillableMetricApiError; use crate::api::utils::uuid_gen; use crate::api::utils::{parse_uuid, PaginationExt}; -use crate::eventbus::Event; use super::{mapping, BillableMetricsComponents}; diff --git a/modules/meteroid/src/api/customers/mod.rs b/modules/meteroid/src/api/customers/mod.rs index a25500c9..d218e3b2 100644 --- a/modules/meteroid/src/api/customers/mod.rs +++ b/modules/meteroid/src/api/customers/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_eventbus::{Event, EventBus}; use deadpool_postgres::{Object, Transaction}; use tonic::Status; @@ -7,7 +8,6 @@ use meteroid_grpc::meteroid::api::customers::v1::customers_service_server::Custo use meteroid_repository::Pool; use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; pub mod error; pub mod mapping; diff --git a/modules/meteroid/src/api/customers/service.rs b/modules/meteroid/src/api/customers/service.rs index 6a67f526..dd9d8e55 100644 --- a/modules/meteroid/src/api/customers/service.rs +++ b/modules/meteroid/src/api/customers/service.rs @@ -1,3 +1,4 @@ +use common_eventbus::Event; use cornucopia_async::Params; use tonic::{Request, Response, Status}; @@ -13,7 +14,6 @@ use meteroid_repository as db; use crate::api::customers::error::CustomerApiError; use crate::api::utils::PaginationExt; use crate::api::utils::{parse_uuid, uuid_gen}; -use crate::eventbus::Event; use super::{mapping, CustomerServiceComponents}; diff --git a/modules/meteroid/src/api/instance/mod.rs b/modules/meteroid/src/api/instance/mod.rs index 000129b3..258673a8 100644 --- a/modules/meteroid/src/api/instance/mod.rs +++ b/modules/meteroid/src/api/instance/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_eventbus::{Event, EventBus}; use deadpool_postgres::{Object, Transaction}; use tonic::Status; @@ -7,7 +8,6 @@ use meteroid_grpc::meteroid::api::instance::v1::instance_service_server::Instanc use meteroid_repository::Pool; use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; mod error; mod service; diff --git a/modules/meteroid/src/api/instance/service.rs b/modules/meteroid/src/api/instance/service.rs index d85e189d..18212944 100644 --- a/modules/meteroid/src/api/instance/service.rs +++ b/modules/meteroid/src/api/instance/service.rs @@ -14,7 +14,7 @@ use meteroid_repository::Params; use crate::api::instance::error::InstanceApiError; use crate::api::instance::InstanceServiceComponents; use crate::api::utils::uuid_gen; -use crate::eventbus::Event; +use common_eventbus::Event; #[tonic::async_trait] impl InstanceService for InstanceServiceComponents { diff --git a/modules/meteroid/src/api/plans/mod.rs b/modules/meteroid/src/api/plans/mod.rs index c4740167..f883d72d 100644 --- a/modules/meteroid/src/api/plans/mod.rs +++ b/modules/meteroid/src/api/plans/mod.rs @@ -1,5 +1,5 @@ use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; +use common_eventbus::{Event, EventBus}; use deadpool_postgres::{Object, Transaction}; use meteroid_grpc::meteroid::api::plans::v1::plans_service_server::PlansServiceServer; use meteroid_repository::Pool; diff --git a/modules/meteroid/src/api/plans/service.rs b/modules/meteroid/src/api/plans/service.rs index 55478c0b..7a56df7c 100644 --- a/modules/meteroid/src/api/plans/service.rs +++ b/modules/meteroid/src/api/plans/service.rs @@ -22,11 +22,11 @@ use crate::api::plans::error::PlanApiError; use crate::api::shared::mapping::period::billing_period_to_db; use crate::api::utils::PaginationExt; -use crate::eventbus::Event; use crate::{ api::utils::{parse_uuid, uuid_gen}, parse_uuid, }; +use common_eventbus::Event; use super::{mapping, PlanServiceComponents}; diff --git a/modules/meteroid/src/api/pricecomponents/mod.rs b/modules/meteroid/src/api/pricecomponents/mod.rs index 4cb04343..ff68870e 100644 --- a/modules/meteroid/src/api/pricecomponents/mod.rs +++ b/modules/meteroid/src/api/pricecomponents/mod.rs @@ -4,7 +4,7 @@ use meteroid_grpc::meteroid::api::components::v1::price_components_service_serve use meteroid_store::Store; -use crate::eventbus::{Event, EventBus}; +use common_eventbus::{Event, EventBus}; mod error; pub(crate) mod ext; diff --git a/modules/meteroid/src/api/pricecomponents/service.rs b/modules/meteroid/src/api/pricecomponents/service.rs index bb9e769a..3d743dd5 100644 --- a/modules/meteroid/src/api/pricecomponents/service.rs +++ b/modules/meteroid/src/api/pricecomponents/service.rs @@ -13,8 +13,8 @@ use meteroid_store::repositories::price_components::PriceComponentInterface; use crate::api::pricecomponents::error::PriceComponentApiError; use crate::api::shared::conversions::ProtoConv; -use crate::eventbus::Event; use crate::{api::utils::parse_uuid, parse_uuid}; +use common_eventbus::Event; use super::{mapping, PriceComponentServiceComponents}; diff --git a/modules/meteroid/src/api/productfamilies/mod.rs b/modules/meteroid/src/api/productfamilies/mod.rs index 94db3523..da3f63d8 100644 --- a/modules/meteroid/src/api/productfamilies/mod.rs +++ b/modules/meteroid/src/api/productfamilies/mod.rs @@ -7,7 +7,7 @@ use meteroid_grpc::meteroid::api::productfamilies::v1::product_families_service_ use meteroid_repository::Pool; use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; +use common_eventbus::{Event, EventBus}; mod error; mod mapping; diff --git a/modules/meteroid/src/api/productfamilies/service.rs b/modules/meteroid/src/api/productfamilies/service.rs index 3f0b0276..e3ec1aad 100644 --- a/modules/meteroid/src/api/productfamilies/service.rs +++ b/modules/meteroid/src/api/productfamilies/service.rs @@ -11,7 +11,7 @@ use meteroid_repository as db; use crate::api::productfamilies::error::ProductFamilyApiError; use crate::api::utils::uuid_gen; -use crate::eventbus::Event; +use common_eventbus::Event; use super::{mapping, ProductFamilyServiceComponents}; diff --git a/modules/meteroid/src/api/server.rs b/modules/meteroid/src/api/server.rs index 2e409548..9674b2e5 100644 --- a/modules/meteroid/src/api/server.rs +++ b/modules/meteroid/src/api/server.rs @@ -18,7 +18,6 @@ use crate::compute::clients::usage::MeteringUsageClient; use crate::compute::InvoiceEngine; use crate::eventbus::analytics_handler::AnalyticsHandler; use crate::eventbus::webhook_handler::WebhookHandler; -use crate::eventbus::{Event, EventBus}; use super::super::config::Config; @@ -53,13 +52,12 @@ pub async fn start_api_server( Arc::new(store.clone()), )); - let eventbus: Arc> = Arc::new(crate::eventbus::memory::InMemory::new()); - // meteroid_store is intended as a replacement for meteroid_repository. It adds an extra domain layer, and replaces cornucopia with diesel // the pools are incompatible, without some refacto // let store = meteroid_store::Store::from_pool(pool.clone()); - eventbus + store + .eventbus .subscribe(Arc::new(WebhookHandler::new( pool.clone(), config.secrets_crypt_key.clone(), @@ -76,7 +74,8 @@ pub async fn start_api_server( } }; - eventbus + store + .eventbus .subscribe(Arc::new(AnalyticsHandler::new( config.analytics.clone(), pool.clone(), @@ -109,35 +108,38 @@ pub async fn start_api_server( .add_service(reflection_service) .add_service(api::billablemetrics::service( pool.clone(), - eventbus.clone(), + store.eventbus.clone(), metering_service, )) - .add_service(api::customers::service(pool.clone(), eventbus.clone())) + .add_service(api::customers::service( + pool.clone(), + store.eventbus.clone(), + )) .add_service(api::tenants::service(store.clone())) - .add_service(api::apitokens::service(store.clone(), eventbus.clone())) + .add_service(api::apitokens::service(store.clone())) .add_service(api::pricecomponents::service( store.clone(), - eventbus.clone(), + store.eventbus.clone(), )) - .add_service(api::plans::service(pool.clone(), eventbus.clone())) + .add_service(api::plans::service(pool.clone(), store.eventbus.clone())) .add_service(api::schedules::service(pool.clone())) .add_service(api::productitems::service(pool.clone())) .add_service(api::productfamilies::service( pool.clone(), - eventbus.clone(), + store.eventbus.clone(), )) - .add_service(api::instance::service(pool.clone(), eventbus.clone())) + .add_service(api::instance::service(pool.clone(), store.eventbus.clone())) .add_service(api::invoices::service(pool.clone())) .add_service(api::stats::service(pool.clone())) .add_service(api::users::service( pool.clone(), - eventbus.clone(), + store.eventbus.clone(), config.jwt_secret.clone(), )) .add_service(api::subscriptions::service( store.clone(), compute_service, - eventbus.clone(), + store.eventbus.clone(), )) .add_service(api::webhooksout::service(store.clone())) .add_service(api::internal::service(pool.clone())) diff --git a/modules/meteroid/src/api/subscriptions/mod.rs b/modules/meteroid/src/api/subscriptions/mod.rs index f94c8f78..b61d5611 100644 --- a/modules/meteroid/src/api/subscriptions/mod.rs +++ b/modules/meteroid/src/api/subscriptions/mod.rs @@ -1,6 +1,6 @@ use crate::compute::InvoiceEngine; -use crate::eventbus::{Event, EventBus}; +use common_eventbus::{Event, EventBus}; use meteroid_grpc::meteroid::api::subscriptions::v1::subscriptions_service_server::SubscriptionsServiceServer; diff --git a/modules/meteroid/src/api/subscriptions/service_old.rs b/modules/meteroid/src/api/subscriptions/service_old.rs index 23c3c893..3275cdea 100644 --- a/modules/meteroid/src/api/subscriptions/service_old.rs +++ b/modules/meteroid/src/api/subscriptions/service_old.rs @@ -6,7 +6,6 @@ Kept for reference until the EventBus is implemented in the service, and the Slo */ - // use cornucopia_async::{GenericClient, Params}; // use time::Time; // use tonic::{Request, Response, Status}; @@ -36,7 +35,7 @@ Kept for reference until the EventBus is implemented in the service, and the Slo // // use crate::compute::fees::shared::CadenceExtractor; // // use crate::compute::fees::ComputeInvoiceLine; // use crate::compute::period; -// use crate::eventbus::Event; +// use common_eventbus::Event; // use crate::mapping::common::{chrono_to_date, chrono_to_datetime}; // use crate::models::InvoiceLine; // use crate::{parse_uuid, parse_uuid_opt}; diff --git a/modules/meteroid/src/api/users/mod.rs b/modules/meteroid/src/api/users/mod.rs index 2d06216d..7a046a64 100644 --- a/modules/meteroid/src/api/users/mod.rs +++ b/modules/meteroid/src/api/users/mod.rs @@ -7,7 +7,7 @@ use tonic::Status; use meteroid_grpc::meteroid::api::users::v1::users_service_server::UsersServiceServer; use crate::db::{get_connection, get_transaction}; -use crate::eventbus::{Event, EventBus}; +use common_eventbus::{Event, EventBus}; mod error; pub mod mapping; diff --git a/modules/meteroid/src/api/users/service.rs b/modules/meteroid/src/api/users/service.rs index 2dcc1f2e..bcc38d75 100644 --- a/modules/meteroid/src/api/users/service.rs +++ b/modules/meteroid/src/api/users/service.rs @@ -21,8 +21,8 @@ use meteroid_repository::{OrganizationUserRole, Params}; use crate::api::users::error::UserApiError; use crate::api::utils::uuid_gen; -use crate::eventbus::Event; use crate::{api::utils::parse_uuid, parse_uuid}; +use common_eventbus::Event; use super::{mapping, UsersServiceComponents}; diff --git a/modules/meteroid/src/bin/seeder.rs b/modules/meteroid/src/bin/seeder.rs index 9f0f295b..01e36b9f 100644 --- a/modules/meteroid/src/bin/seeder.rs +++ b/modules/meteroid/src/bin/seeder.rs @@ -4,6 +4,7 @@ use tokio::signal; use common_logging::init::init_regular_logging; use error_stack::ResultExt; +use meteroid::eventbus::create_eventbus_noop; use meteroid::seeder::domain; use meteroid::seeder::errors::SeederError; use meteroid::seeder::runner; @@ -25,6 +26,7 @@ async fn main() -> error_stack::Result<(), SeederError> { let store = Store::new( env::var("DATABASE_URL").change_context(SeederError::InitializationError)?, crypt_key, + create_eventbus_noop().await, ) .change_context(SeederError::InitializationError)?; diff --git a/modules/meteroid/src/bin/server.rs b/modules/meteroid/src/bin/server.rs index 7a52d55f..81b9ca5b 100644 --- a/modules/meteroid/src/bin/server.rs +++ b/modules/meteroid/src/bin/server.rs @@ -6,6 +6,7 @@ use common_build_info::BuildInfo; use common_logging::init::init_telemetry; use meteroid::adapters::stripe::Stripe; use meteroid::config::Config; +use meteroid::eventbus::create_eventbus_memory; use meteroid::singletons::get_pool; use meteroid::webhook_in_api; use meteroid_repository::migrations; @@ -31,6 +32,7 @@ async fn main() -> Result<(), Box> { let store = meteroid_store::Store::new( config.database_url.clone(), config.secrets_crypt_key.clone(), + create_eventbus_memory(pool.clone(), config.clone()).await, )?; let private_server = diff --git a/modules/meteroid/src/eventbus/analytics_handler.rs b/modules/meteroid/src/eventbus/analytics_handler.rs index f2bcf9fc..19cd032c 100644 --- a/modules/meteroid/src/eventbus/analytics_handler.rs +++ b/modules/meteroid/src/eventbus/analytics_handler.rs @@ -6,12 +6,11 @@ use uuid::Uuid; use common_build_info::BuildInfo; use common_config::analytics::AnalyticsConfig; +use common_eventbus::{EventBusError, EventHandler}; use common_logging::unwrapper::UnwrapLogger; use common_repository::Pool; -use crate::eventbus::{ - Event, EventBusError, EventData, EventDataDetails, EventHandler, TenantEventDataDetails, -}; +use common_eventbus::{Event, EventData, EventDataDetails, TenantEventDataDetails}; pub struct AnalyticsHandler { pool: Pool, diff --git a/modules/meteroid/src/eventbus/memory.rs b/modules/meteroid/src/eventbus/memory.rs index 9d98f9d2..8486e7a4 100644 --- a/modules/meteroid/src/eventbus/memory.rs +++ b/modules/meteroid/src/eventbus/memory.rs @@ -1,4 +1,5 @@ -use crate::eventbus::{EventBus, EventBusError, EventHandler}; +use common_eventbus::EventBus; +use common_eventbus::{EventBusError, EventHandler}; use std::fmt::Debug; use std::sync::Arc; use tokio::sync::broadcast::error::RecvError; @@ -64,9 +65,9 @@ impl InMemory { #[cfg(test)] mod tests { - use crate::eventbus; use crate::eventbus::memory::InMemory; - use crate::eventbus::{EventBus, EventHandler}; + use common_eventbus::EventBus; + use common_eventbus::{EventBusError, EventHandler}; use std::collections::HashSet; use std::sync::{Arc, Mutex}; @@ -89,7 +90,7 @@ mod tests { #[async_trait::async_trait] impl EventHandler for CapturingEventHandler { - async fn handle(&self, event: u8) -> Result<(), eventbus::EventBusError> { + async fn handle(&self, event: u8) -> Result<(), EventBusError> { let mut guard = self.items.lock().unwrap(); guard.push(event); Ok(()) diff --git a/modules/meteroid/src/eventbus/mod.rs b/modules/meteroid/src/eventbus/mod.rs index 7bc7debe..1632be41 100644 --- a/modules/meteroid/src/eventbus/mod.rs +++ b/modules/meteroid/src/eventbus/mod.rs @@ -1,299 +1,74 @@ -use std::fmt::Debug; use std::sync::Arc; -use uuid::Uuid; +use common_eventbus::{Event, EventBus}; -use crate::api::utils::uuid_gen; use crate::config::Config; +use crate::eventbus::analytics_handler::AnalyticsHandler; +use crate::eventbus::memory::InMemory; +use crate::eventbus::noop::NoopEventBus; +use crate::eventbus::webhook_handler::WebhookHandler; use crate::singletons; pub mod analytics_handler; pub mod memory; +pub mod noop; pub mod webhook_handler; -static CONFIG: tokio::sync::OnceCell>> = tokio::sync::OnceCell::const_new(); +static EVENTBUS_MEMORY: tokio::sync::OnceCell>> = + tokio::sync::OnceCell::const_new(); -#[derive(thiserror::Error, Debug, Clone)] -pub enum EventBusError { - #[error("Failed to publish event")] - PublishFailed, - #[error("Failed to handle event: {0}")] - EventHandlerFailed(String), -} +pub struct EventBusStatic; -#[async_trait::async_trait] -pub trait EventHandler: Send + Sync { - async fn handle(&self, event: E) -> Result<(), EventBusError>; +pub async fn create_eventbus_noop() -> Arc> { + Arc::new(NoopEventBus::new()) } -#[async_trait::async_trait] -pub trait EventBus: Send + Sync { - async fn subscribe(&self, handler: Arc>); - async fn publish(&self, event: E) -> Result<(), EventBusError>; +pub async fn create_eventbus_memory( + pool: deadpool_postgres::Pool, + config: Config, +) -> Arc> { + let eventbus = Arc::new(InMemory::new()); + + eventbus + .subscribe(Arc::new(WebhookHandler::new( + pool.clone(), + config.secrets_crypt_key.clone(), + true, + ))) + .await; + + if config.analytics.enabled { + let country = match analytics_handler::get_geoip().await { + Ok(geoip) => Some(geoip.country), + Err(err) => { + log::warn!("Failed to obtain data for analytics: {}", err); + None + } + }; + + eventbus + .subscribe(Arc::new(AnalyticsHandler::new( + config.analytics.clone(), + pool.clone(), + country, + ))) + .await; + } else { + log::info!("Analytics is disabled"); + } + + eventbus } -pub struct EventBusStatic; - impl EventBusStatic { pub async fn get() -> &'static Arc> { - CONFIG + EVENTBUS_MEMORY .get_or_init(|| async { let config = Config::get(); let pool = singletons::get_pool(); - let bus: Arc> = Arc::new(memory::InMemory::new()); - - bus.subscribe(Arc::new(webhook_handler::WebhookHandler::new( - pool.clone(), - config.secrets_crypt_key.clone(), - true, - ))) - .await; - - bus.subscribe(Arc::new(webhook_handler::WebhookHandler::new( - pool.clone(), - config.secrets_crypt_key.clone(), - true, - ))) - .await; - - if config.analytics.enabled { - let country = match crate::eventbus::analytics_handler::get_geoip().await { - Ok(geoip) => Some(geoip.country), - Err(err) => { - log::warn!("Failed to obtain data for analytics: {}", err); - None - } - }; - - bus.subscribe(Arc::new(analytics_handler::AnalyticsHandler::new( - config.analytics.clone(), - pool.clone(), - country, - ))) - .await; - } else { - log::info!("Analytics is disabled"); - } - - bus + create_eventbus_memory(pool.clone(), config.clone()).await }) .await } } - -#[derive(Debug, Clone)] -pub struct Event { - pub event_id: Uuid, - pub event_timestamp: chrono::DateTime, - pub event_data: EventData, - pub actor: Option, -} - -impl Event { - pub fn new(event_data: EventData, actor: Option) -> Self { - Self { - event_id: uuid_gen::v7(), - event_timestamp: chrono::Utc::now(), - event_data, - actor, - } - } - - pub fn api_token_created(actor: Uuid, api_token_id: Uuid) -> Self { - Self::new( - EventData::ApiTokenCreated(EventDataDetails { - entity_id: api_token_id, - }), - Some(actor), - ) - } - - pub fn billable_metric_created(actor: Uuid, billable_metric_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::BillableMetricCreated(TenantEventDataDetails { - tenant_id, - entity_id: billable_metric_id, - }), - Some(actor), - ) - } - - pub fn customer_created(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::CustomerCreated(TenantEventDataDetails { - tenant_id, - entity_id: customer_id, - }), - Some(actor), - ) - } - pub fn customer_patched(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::CustomerPatched(TenantEventDataDetails { - tenant_id, - entity_id: customer_id, - }), - Some(actor), - ) - } - - pub fn instance_inited(actor: Uuid, organization_id: Uuid) -> Self { - Self::new( - EventData::InstanceInited(EventDataDetails { - entity_id: organization_id, - }), - Some(actor), - ) - } - - pub fn invoice_created(invoice_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::InvoiceCreated(TenantEventDataDetails { - tenant_id, - entity_id: invoice_id, - }), - None, - ) - } - - pub fn invoice_finalized(invoice_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::InvoiceFinalized(TenantEventDataDetails { - tenant_id, - entity_id: invoice_id, - }), - None, - ) - } - - pub fn plan_created_draft(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PlanCreatedDraft(TenantEventDataDetails { - tenant_id, - entity_id: plan_version_id, - }), - Some(actor), - ) - } - - pub fn plan_published_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PlanPublishedVersion(TenantEventDataDetails { - tenant_id, - entity_id: plan_version_id, - }), - Some(actor), - ) - } - - pub fn plan_discarded_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PlanDiscardedVersion(TenantEventDataDetails { - tenant_id, - entity_id: plan_version_id, - }), - Some(actor), - ) - } - - pub fn price_component_created(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PriceComponentCreated(TenantEventDataDetails { - tenant_id, - entity_id: price_component_id, - }), - Some(actor), - ) - } - - pub fn price_component_edited(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PriceComponentEdited(TenantEventDataDetails { - tenant_id, - entity_id: price_component_id, - }), - Some(actor), - ) - } - - pub fn price_component_removed(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::PriceComponentRemoved(TenantEventDataDetails { - tenant_id, - entity_id: price_component_id, - }), - Some(actor), - ) - } - - pub fn product_family_created(actor: Uuid, product_family_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::ProductFamilyCreated(TenantEventDataDetails { - tenant_id, - entity_id: product_family_id, - }), - Some(actor), - ) - } - - pub fn subscription_created(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::SubscriptionCreated(TenantEventDataDetails { - tenant_id, - entity_id: subscription_id, - }), - Some(actor), - ) - } - - pub fn subscription_canceled(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self { - Self::new( - EventData::SubscriptionCanceled(TenantEventDataDetails { - tenant_id, - entity_id: subscription_id, - }), - Some(actor), - ) - } - - pub fn user_created(actor: Option, user_id: Uuid) -> Self { - Self::new( - EventData::UserCreated(EventDataDetails { entity_id: user_id }), - actor, - ) - } -} - -#[derive(Debug, Clone)] -pub enum EventData { - ApiTokenCreated(EventDataDetails), - BillableMetricCreated(TenantEventDataDetails), - CustomerCreated(TenantEventDataDetails), - CustomerPatched(TenantEventDataDetails), - InstanceInited(EventDataDetails), - InvoiceCreated(TenantEventDataDetails), - InvoiceFinalized(TenantEventDataDetails), - PlanCreatedDraft(TenantEventDataDetails), - PlanPublishedVersion(TenantEventDataDetails), - PlanDiscardedVersion(TenantEventDataDetails), - PriceComponentCreated(TenantEventDataDetails), - PriceComponentEdited(TenantEventDataDetails), - PriceComponentRemoved(TenantEventDataDetails), - ProductFamilyCreated(TenantEventDataDetails), - SubscriptionCreated(TenantEventDataDetails), - SubscriptionCanceled(TenantEventDataDetails), - TenantCreated(TenantEventDataDetails), - UserCreated(EventDataDetails), -} - -#[derive(Debug, Clone)] -pub struct EventDataDetails { - pub entity_id: Uuid, -} - -#[derive(Debug, Clone)] -pub struct TenantEventDataDetails { - pub tenant_id: Uuid, - pub entity_id: Uuid, -} diff --git a/modules/meteroid/tests/integration/meteroid_it/eventbus.rs b/modules/meteroid/src/eventbus/noop.rs similarity index 81% rename from modules/meteroid/tests/integration/meteroid_it/eventbus.rs rename to modules/meteroid/src/eventbus/noop.rs index b5f2aa37..322ed855 100644 --- a/modules/meteroid/tests/integration/meteroid_it/eventbus.rs +++ b/modules/meteroid/src/eventbus/noop.rs @@ -1,4 +1,5 @@ -use meteroid::eventbus::{Event, EventBus, EventBusError, EventHandler}; +use common_eventbus::{Event, EventBus}; +use common_eventbus::{EventBusError, EventHandler}; use std::sync::Arc; pub struct NoopEventBus; diff --git a/modules/meteroid/src/eventbus/webhook_handler.rs b/modules/meteroid/src/eventbus/webhook_handler.rs index d5ba7e94..ef4a30b8 100644 --- a/modules/meteroid/src/eventbus/webhook_handler.rs +++ b/modules/meteroid/src/eventbus/webhook_handler.rs @@ -1,20 +1,22 @@ -use crate::eventbus::{Event, EventBusError, EventData, EventHandler, TenantEventDataDetails}; -use crate::mapping::common::date_to_chrono; -use crate::webhook; -use crate::webhook::Webhook; use cached::proc_macro::cached; -use common_repository::Pool; use cornucopia_async::Params; -use meteroid_repository::WebhookOutEventTypeEnum; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use secrecy::{ExposeSecret, SecretString}; use serde::Serialize; use uuid::Uuid; -use crate::api::utils::uuid_gen; +use common_eventbus::{EventBusError, EventHandler}; +use common_repository::Pool; use meteroid_repository::webhook_out_events::CreateEventParams; +use meteroid_repository::WebhookOutEventTypeEnum; use meteroid_store::crypt; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; + +use crate::api::utils::uuid_gen; +use crate::mapping::common::date_to_chrono; +use crate::webhook; +use crate::webhook::Webhook; +use common_eventbus::{Event, EventData, TenantEventDataDetails}; const ENDPOINT_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); const ENDPOINT_RETRIES: u32 = 3; diff --git a/modules/meteroid/src/singletons.rs b/modules/meteroid/src/singletons.rs index 493839a2..ffdc3d66 100644 --- a/modules/meteroid/src/singletons.rs +++ b/modules/meteroid/src/singletons.rs @@ -1,7 +1,11 @@ -use crate::config::Config; +use std::sync::OnceLock; + use deadpool_postgres::Pool; + use meteroid_store::Store; -use std::sync::OnceLock; + +use crate::config::Config; +use crate::eventbus::EventBusStatic; static POOL: OnceLock = OnceLock::new(); @@ -12,15 +16,18 @@ pub fn get_pool() -> &'static Pool { }) } -static STORE: OnceLock = OnceLock::new(); - -pub fn get_store() -> &'static Store { - STORE.get_or_init(|| { - let config = Config::get(); - Store::new( - config.database_url.clone(), - config.secrets_crypt_key.clone(), - ) - .expect("Failed to initialize store") - }) +static STORE: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); +pub async fn get_store() -> &'static Store { + STORE + .get_or_init(|| async { + let config = Config::get(); + let eventbus = EventBusStatic::get().await; + Store::new( + config.database_url.clone(), + config.secrets_crypt_key.clone(), + eventbus.clone(), + ) + .expect("Failed to initialize store") + }) + .await } diff --git a/modules/meteroid/src/workers/invoicing/draft_worker.rs b/modules/meteroid/src/workers/invoicing/draft_worker.rs index da06f8f6..ef2f1e73 100644 --- a/modules/meteroid/src/workers/invoicing/draft_worker.rs +++ b/modules/meteroid/src/workers/invoicing/draft_worker.rs @@ -7,12 +7,13 @@ use error_stack::{Result, ResultExt}; use fang::{AsyncQueueable, AsyncRunnable, Deserialize, FangError, Scheduled, Serialize}; use futures::StreamExt; use meteroid_repository as db; -use std::ops::Deref; + use time::Date; -use crate::eventbus::{Event, EventBus, EventBusStatic}; +use common_eventbus::Event; use meteroid_repository::subscriptions::SubscriptionToInvoice; use meteroid_store::domain::enums::BillingPeriodEnum; +use meteroid_store::Store; const BATCH_SIZE: usize = 100; @@ -26,11 +27,10 @@ impl AsyncRunnable for DraftWorker { #[tracing::instrument(skip_all)] async fn run(&self, _queue: &mut dyn AsyncQueueable) -> core::result::Result<(), FangError> { let pool = singletons::get_pool(); - let eventbus = EventBusStatic::get().await; draft_worker( + singletons::get_store().await, pool, - eventbus.deref(), time::OffsetDateTime::now_utc().date(), ) .timed(|res, elapsed| record_call("draft", res, elapsed)) @@ -59,8 +59,8 @@ impl AsyncRunnable for DraftWorker { #[tracing::instrument(skip_all)] pub async fn draft_worker( + store: &Store, pool: &Pool, - eventbus: &dyn EventBus, today: Date, ) -> Result<(), errors::WorkerError> { let db_client = pool @@ -124,7 +124,8 @@ pub async fn draft_worker( .change_context(errors::WorkerError::DatabaseError)?; for param in ¶ms { - let _ = eventbus + let _ = store + .eventbus .publish(Event::invoice_created(param.id, param.tenant_id)) .await; } diff --git a/modules/meteroid/src/workers/invoicing/finalize_worker.rs b/modules/meteroid/src/workers/invoicing/finalize_worker.rs index 52224319..a6351407 100644 --- a/modules/meteroid/src/workers/invoicing/finalize_worker.rs +++ b/modules/meteroid/src/workers/invoicing/finalize_worker.rs @@ -6,7 +6,7 @@ use meteroid_repository as db; use crate::{compute::InvoiceEngine, errors, singletons}; use crate::compute::clients::usage::MeteringUsageClient; -use crate::eventbus::{Event, EventBus, EventBusStatic}; +use common_eventbus::Event; use common_utils::timed::TimedExt; use error_stack::{Result, ResultExt}; use fang::{AsyncQueueable, AsyncRunnable, Deserialize, FangError, Scheduled, Serialize}; @@ -30,12 +30,10 @@ pub struct FinalizeWorker; impl AsyncRunnable for FinalizeWorker { #[tracing::instrument(skip_all)] async fn run(&self, _queue: &mut dyn AsyncQueueable) -> core::result::Result<(), FangError> { - let eventbus = EventBusStatic::get().await; finalize_worker( - singletons::get_pool().clone(), + singletons::get_store().await, + singletons::get_pool(), MeteringClient::get().clone(), - eventbus.clone(), - singletons::get_store().clone(), ) .timed(|res, elapsed| record_call("finalize", res, elapsed)) .await @@ -63,10 +61,9 @@ impl AsyncRunnable for FinalizeWorker { #[tracing::instrument(skip_all)] pub async fn finalize_worker( - db_pool: Pool, + store: &Store, + db_pool: &Pool, metering_client: MeteringClient, - eventbus: Arc>, - store: Store, ) -> Result<(), errors::WorkerError> { let connection = db_pool .get() @@ -107,7 +104,7 @@ pub async fn finalize_worker( .await .change_context(errors::WorkerError::DatabaseError)?; - let eventbus = eventbus.clone(); + let eventbus = store.eventbus.clone(); let task = tokio::spawn(async move { let _permit = permit; // Moves permit into the async block diff --git a/modules/meteroid/src/workers/invoicing/issue_worker.rs b/modules/meteroid/src/workers/invoicing/issue_worker.rs index ab7f7e31..8f7bc697 100644 --- a/modules/meteroid/src/workers/invoicing/issue_worker.rs +++ b/modules/meteroid/src/workers/invoicing/issue_worker.rs @@ -24,9 +24,9 @@ impl AsyncRunnable for IssueWorker { #[tracing::instrument(skip(self, _queue))] async fn run(&self, _queue: &mut dyn AsyncQueueable) -> core::result::Result<(), FangError> { issue_worker( + singletons::get_store().await, singletons::get_pool(), Stripe::get(), - singletons::get_store(), ) .timed(|res, elapsed| record_call("issue", res, elapsed)) .await @@ -54,9 +54,9 @@ impl AsyncRunnable for IssueWorker { #[tracing::instrument(skip_all)] async fn issue_worker( + store: &Store, pool: &Pool, stripe_adapter: &Stripe, - store: &Store, ) -> Result<(), errors::WorkerError> { // fetch all invoices with issue=false and send to stripe diff --git a/modules/meteroid/src/workers/invoicing/price_worker.rs b/modules/meteroid/src/workers/invoicing/price_worker.rs index e8768213..aa07b1d9 100644 --- a/modules/meteroid/src/workers/invoicing/price_worker.rs +++ b/modules/meteroid/src/workers/invoicing/price_worker.rs @@ -42,8 +42,8 @@ impl AsyncRunnable for PriceWorker { #[tracing::instrument(skip_all)] async fn run(&self, _queue: &mut dyn AsyncQueueable) -> core::result::Result<(), FangError> { price_worker( - singletons::get_pool().clone(), - singletons::get_store().clone(), + singletons::get_store().await, + singletons::get_pool(), MeteringClient::get().clone(), ) .timed(|res, elapsed| record_call("price", res, elapsed)) @@ -72,8 +72,8 @@ impl AsyncRunnable for PriceWorker { #[tracing::instrument(skip_all)] pub async fn price_worker( - db_pool: Pool, - store: Store, + store: &Store, + db_pool: &Pool, metering_client: MeteringClient, ) -> Result<(), errors::WorkerError> { // fetch all invoice not finalized/voided and not updated since > 1h diff --git a/modules/meteroid/tests/integration/e2e.rs b/modules/meteroid/tests/integration/e2e.rs index c388ee5d..4eb7c55e 100644 --- a/modules/meteroid/tests/integration/e2e.rs +++ b/modules/meteroid/tests/integration/e2e.rs @@ -1,25 +1,17 @@ -use chrono::{Datelike, Days, Months}; -use cornucopia_async::{GenericClient, Params}; - -use opentelemetry::propagation::Injector; use std::collections::HashMap; -use std::ops::Deref; use std::str::FromStr; -use std::sync::Arc; +use chrono::{Datelike, Days, Months}; +use cornucopia_async::{GenericClient, Params}; +use opentelemetry::propagation::Injector; use rust_decimal::Decimal; use testcontainers::clients::Cli; use tonic::Request; use uuid::{uuid, Uuid}; -use crate::metering_it; -use crate::{helpers, meteroid_it}; - -use crate::meteroid_it::eventbus::NoopEventBus; use metering::utils::datetime_to_timestamp; use metering_grpc::meteroid::metering::v1::{event::CustomerId, Event, IngestRequest}; use meteroid::db::get_connection; -use meteroid::eventbus::EventBus; use meteroid::mapping::common::chrono_to_date; use meteroid::models::{InvoiceLine, InvoiceLinePeriod}; use meteroid_grpc::meteroid::api; @@ -30,10 +22,12 @@ use meteroid_grpc::meteroid::api::billablemetrics::v1::segmentation_matrix::{ use meteroid_grpc::meteroid::api::billablemetrics::v1::{ Aggregation, CreateBillableMetricRequest, SegmentationMatrix, }; - use meteroid_grpc::meteroid::api::plans::v1::PlanType; use meteroid_repository::invoices::ListInvoice; +use crate::metering_it; +use crate::{helpers, meteroid_it}; + /* Plan with Capacity (aka fixed advance fee + usage fee) @@ -477,12 +471,10 @@ async fn test_metering_e2e() { ] ); - let eventbus: Arc> = Arc::new(NoopEventBus::new()); - // DRAFT WORKER meteroid::workers::invoicing::draft_worker::draft_worker( + &meteroid_setup.store, &meteroid_setup.pool, - eventbus.deref(), chrono_to_date(now.date_naive()).unwrap(), ) .await @@ -524,8 +516,8 @@ async fn test_metering_e2e() { // PRICE WORKER meteroid::workers::invoicing::price_worker::price_worker( - meteroid_setup.pool.clone(), - meteroid_setup.store.clone(), + &meteroid_setup.store, + &meteroid_setup.pool, metering_client.clone(), ) .await @@ -588,10 +580,9 @@ async fn test_metering_e2e() { // FINALIZER meteroid::workers::invoicing::finalize_worker::finalize_worker( - meteroid_setup.pool.clone(), + &meteroid_setup.store, + &meteroid_setup.pool, metering_client.clone(), - eventbus.clone(), - meteroid_setup.store.clone(), ) .await .unwrap(); diff --git a/modules/meteroid/tests/integration/meteroid_it/container.rs b/modules/meteroid/tests/integration/meteroid_it/container.rs index ffae9dd1..2cf148c5 100644 --- a/modules/meteroid/tests/integration/meteroid_it/container.rs +++ b/modules/meteroid/tests/integration/meteroid_it/container.rs @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use meteroid::config::Config; +use meteroid::eventbus::create_eventbus_memory; use meteroid_repository::migrations; use crate::helpers; @@ -49,6 +50,7 @@ pub async fn start_meteroid_with_port( let store = meteroid_store::Store::new( config.database_url.clone(), config.secrets_crypt_key.clone(), + create_eventbus_memory(pool.clone(), config.clone()).await, ) .expect("Could not create store"); diff --git a/modules/meteroid/tests/integration/meteroid_it/mod.rs b/modules/meteroid/tests/integration/meteroid_it/mod.rs index 22b6f692..71a9a5b8 100644 --- a/modules/meteroid/tests/integration/meteroid_it/mod.rs +++ b/modules/meteroid/tests/integration/meteroid_it/mod.rs @@ -4,5 +4,4 @@ pub mod clients; pub mod config; pub mod container; pub mod db; -pub mod eventbus; pub mod svc_auth; diff --git a/modules/meteroid/tests/integration/test_webhooks_out.rs b/modules/meteroid/tests/integration/test_webhooks_out.rs index fb327310..73b56497 100644 --- a/modules/meteroid/tests/integration/test_webhooks_out.rs +++ b/modules/meteroid/tests/integration/test_webhooks_out.rs @@ -1,6 +1,7 @@ use chrono::DateTime; +use common_eventbus::Event; +use common_eventbus::EventHandler; use meteroid::eventbus::webhook_handler::WebhookHandler; -use meteroid::eventbus::{Event, EventHandler}; use std::str::FromStr; use testcontainers::clients::Cli; @@ -167,8 +168,8 @@ async fn test_webhook_out_handler() { event_timestamp: DateTime::parse_from_rfc3339("2024-01-01T23:22:15Z") .unwrap() .to_utc(), - event_data: meteroid::eventbus::EventData::SubscriptionCreated( - meteroid::eventbus::TenantEventDataDetails { + event_data: common_eventbus::EventData::SubscriptionCreated( + common_eventbus::TenantEventDataDetails { tenant_id: TENANT_ID, entity_id: SUBSCRIPTION_SPORTIFY_ID1, }, @@ -200,8 +201,8 @@ async fn test_webhook_out_handler() { event_timestamp: DateTime::parse_from_rfc3339("2024-02-01T23:22:15Z") .unwrap() .to_utc(), - event_data: meteroid::eventbus::EventData::CustomerCreated( - meteroid::eventbus::TenantEventDataDetails { + event_data: common_eventbus::EventData::CustomerCreated( + common_eventbus::TenantEventDataDetails { tenant_id: TENANT_ID, entity_id: CUSTOMER_UBER_ID, }, diff --git a/modules/meteroid/tests/integration/test_workers.rs b/modules/meteroid/tests/integration/test_workers.rs index 706d4430..86ba4078 100644 --- a/modules/meteroid/tests/integration/test_workers.rs +++ b/modules/meteroid/tests/integration/test_workers.rs @@ -1,20 +1,22 @@ -use crate::helpers; -use crate::meteroid_it; -use crate::meteroid_it::db::seed::*; -use crate::meteroid_it::eventbus::NoopEventBus; +use std::collections::HashSet; + use cornucopia_async::Params; use deadpool_postgres::Pool; -use meteroid::eventbus::EventBus; -use meteroid::workers::invoicing::draft_worker::draft_worker; -use meteroid_repository::invoices::ListInvoice; -use meteroid_repository::InvoiceStatusEnum; -use std::collections::HashSet; -use std::ops::Deref; -use std::sync::Arc; use testcontainers::clients::Cli; use time::macros::date; use uuid::Uuid; +use meteroid::eventbus::create_eventbus_noop; +use meteroid::singletons; +use meteroid::workers::invoicing::draft_worker::draft_worker; +use meteroid_repository::invoices::ListInvoice; +use meteroid_repository::InvoiceStatusEnum; +use meteroid_store::Store; + +use crate::helpers; +use crate::meteroid_it; +use crate::meteroid_it::db::seed::*; + #[tokio::test] async fn test_draft_worker() { helpers::init::logging(); @@ -31,9 +33,15 @@ async fn test_draft_worker() { let worker_run_date = date!(2023 - 11 - 04); - let eventbus: Arc> = Arc::new(NoopEventBus::new()); + let store = Store::new( + postgres_connection_string, + secrecy::SecretString::new("test-key".into()), + //create_eventbus_noop().await, + singletons::get_store().await.eventbus.clone(), + ) + .unwrap(); - draft_worker(&pool, eventbus.deref(), worker_run_date.clone()) + draft_worker(&store, &pool, worker_run_date.clone()) .await .unwrap(); @@ -90,7 +98,7 @@ async fn test_draft_worker() { } // second run should not create new invoices - draft_worker(&pool, eventbus.deref(), worker_run_date.next_day().unwrap()) + draft_worker(&store, &pool, worker_run_date.next_day().unwrap()) .await .unwrap();