Skip to content

Commit

Permalink
refactor: relocation of eventbus to store (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
vmorarian authored Apr 25, 2024
1 parent 3c0b082 commit 67fc85e
Show file tree
Hide file tree
Showing 43 changed files with 470 additions and 414 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"crates/common-build-info",
"crates/common-config",
"crates/common-domain",
"crates/common-eventbus",
"crates/common-grpc",
"crates/common-grpc-error-as-tonic-macros",
"crates/common-grpc-error-as-tonic-macros-impl",
Expand Down Expand Up @@ -164,6 +165,7 @@ rand_chacha = "0.3.1"
common-build-info = { path = "crates/common-build-info" }
common-domain = { path = "crates/common-domain" }
common-config = { path = "crates/common-config" }
common-eventbus = { path = "crates/common-eventbus" }
common-logging = { path = "crates/common-logging" }
common-repository = { path = "crates/common-repository" }
common-grpc = { path = "crates/common-grpc" }
Expand Down
14 changes: 14 additions & 0 deletions crates/common-eventbus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "common-eventbus"
version = "0.1.0"
rust-version.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait.workspace = true
thiserror.workspace = true
uuid = { workspace = true, features = ["v4", "v7"] }
chrono = { workspace = true, features = ["clock"] }

238 changes: 238 additions & 0 deletions crates/common-eventbus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use std::sync::Arc;

use uuid::Uuid;

#[async_trait::async_trait]
pub trait EventBus<E>: Send + Sync {
async fn subscribe(&self, handler: Arc<dyn EventHandler<E>>);
async fn publish(&self, event: E) -> Result<(), EventBusError>;
}

#[derive(thiserror::Error, Debug, Clone)]
pub enum EventBusError {
#[error("Failed to publish event")]
PublishFailed,
#[error("Failed to handle event: {0}")]
EventHandlerFailed(String),
}

#[async_trait::async_trait]
pub trait EventHandler<E>: Send + Sync {
async fn handle(&self, event: E) -> Result<(), EventBusError>;
}

#[derive(Debug, Clone)]
pub struct Event {
pub event_id: Uuid,
pub event_timestamp: chrono::DateTime<chrono::Utc>,
pub event_data: EventData,
pub actor: Option<Uuid>,
}

impl Event {
pub fn new(event_data: EventData, actor: Option<Uuid>) -> Self {
Self {
event_id: Uuid::now_v7(),
event_timestamp: chrono::Utc::now(),
event_data,
actor,
}
}

pub fn api_token_created(actor: Uuid, api_token_id: Uuid) -> Self {
Self::new(
EventData::ApiTokenCreated(EventDataDetails {
entity_id: api_token_id,
}),
Some(actor),
)
}

pub fn billable_metric_created(actor: Uuid, billable_metric_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::BillableMetricCreated(TenantEventDataDetails {
tenant_id,
entity_id: billable_metric_id,
}),
Some(actor),
)
}

pub fn customer_created(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::CustomerCreated(TenantEventDataDetails {
tenant_id,
entity_id: customer_id,
}),
Some(actor),
)
}
pub fn customer_patched(actor: Uuid, customer_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::CustomerPatched(TenantEventDataDetails {
tenant_id,
entity_id: customer_id,
}),
Some(actor),
)
}

pub fn instance_inited(actor: Uuid, organization_id: Uuid) -> Self {
Self::new(
EventData::InstanceInited(EventDataDetails {
entity_id: organization_id,
}),
Some(actor),
)
}

pub fn invoice_created(invoice_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::InvoiceCreated(TenantEventDataDetails {
tenant_id,
entity_id: invoice_id,
}),
None,
)
}

pub fn invoice_finalized(invoice_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::InvoiceFinalized(TenantEventDataDetails {
tenant_id,
entity_id: invoice_id,
}),
None,
)
}

pub fn plan_created_draft(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PlanCreatedDraft(TenantEventDataDetails {
tenant_id,
entity_id: plan_version_id,
}),
Some(actor),
)
}

pub fn plan_published_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PlanPublishedVersion(TenantEventDataDetails {
tenant_id,
entity_id: plan_version_id,
}),
Some(actor),
)
}

pub fn plan_discarded_version(actor: Uuid, plan_version_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PlanDiscardedVersion(TenantEventDataDetails {
tenant_id,
entity_id: plan_version_id,
}),
Some(actor),
)
}

pub fn price_component_created(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PriceComponentCreated(TenantEventDataDetails {
tenant_id,
entity_id: price_component_id,
}),
Some(actor),
)
}

pub fn price_component_edited(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PriceComponentEdited(TenantEventDataDetails {
tenant_id,
entity_id: price_component_id,
}),
Some(actor),
)
}

pub fn price_component_removed(actor: Uuid, price_component_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::PriceComponentRemoved(TenantEventDataDetails {
tenant_id,
entity_id: price_component_id,
}),
Some(actor),
)
}

pub fn product_family_created(actor: Uuid, product_family_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::ProductFamilyCreated(TenantEventDataDetails {
tenant_id,
entity_id: product_family_id,
}),
Some(actor),
)
}

pub fn subscription_created(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::SubscriptionCreated(TenantEventDataDetails {
tenant_id,
entity_id: subscription_id,
}),
Some(actor),
)
}

pub fn subscription_canceled(actor: Uuid, subscription_id: Uuid, tenant_id: Uuid) -> Self {
Self::new(
EventData::SubscriptionCanceled(TenantEventDataDetails {
tenant_id,
entity_id: subscription_id,
}),
Some(actor),
)
}

pub fn user_created(actor: Option<Uuid>, user_id: Uuid) -> Self {
Self::new(
EventData::UserCreated(EventDataDetails { entity_id: user_id }),
actor,
)
}
}

#[derive(Debug, Clone)]
pub enum EventData {
ApiTokenCreated(EventDataDetails),
BillableMetricCreated(TenantEventDataDetails),
CustomerCreated(TenantEventDataDetails),
CustomerPatched(TenantEventDataDetails),
InstanceInited(EventDataDetails),
InvoiceCreated(TenantEventDataDetails),
InvoiceFinalized(TenantEventDataDetails),
PlanCreatedDraft(TenantEventDataDetails),
PlanPublishedVersion(TenantEventDataDetails),
PlanDiscardedVersion(TenantEventDataDetails),
PriceComponentCreated(TenantEventDataDetails),
PriceComponentEdited(TenantEventDataDetails),
PriceComponentRemoved(TenantEventDataDetails),
ProductFamilyCreated(TenantEventDataDetails),
SubscriptionCreated(TenantEventDataDetails),
SubscriptionCanceled(TenantEventDataDetails),
TenantCreated(TenantEventDataDetails),
UserCreated(EventDataDetails),
}

#[derive(Debug, Clone)]
pub struct EventDataDetails {
pub entity_id: Uuid,
}

#[derive(Debug, Clone)]
pub struct TenantEventDataDetails {
pub tenant_id: Uuid,
pub entity_id: Uuid,
}
1 change: 1 addition & 0 deletions modules/meteroid/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rust_decimal_macros = { workspace = true }

metering-grpc = { workspace = true, features = ["client"] }
common-domain = { workspace = true }
common-eventbus = { workspace = true }
common-utils = { workspace = true, features = ["error-stack-conv"] }
common-repository = { workspace = true }
distributed-lock = { workspace = true, features = ["postgres-support"] }
Expand Down
2 changes: 2 additions & 0 deletions modules/meteroid/crates/meteroid-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
common-eventbus.workspace = true

argon2 = { workspace = true }
base62.workspace = true
chrono.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use argon2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Argon2,
};
use common_eventbus::{Event, EventBusError};
use error_stack::Report;
use nanoid::nanoid;
use tracing_log::log;
use uuid::Uuid;
Expand Down Expand Up @@ -101,12 +103,22 @@ impl ApiTokensInterface for Store {
hint: hint,
};

let result = insertable_entity
let result: Result<ApiToken, Report<StoreError>> = insertable_entity
.insert(&mut conn)
.await
.map_err(Into::into)
.map(Into::into);

if result.is_ok() {
let _ = self
.eventbus
.publish(Event::api_token_created(
insertable_entity.created_by,
insertable_entity.id,
))
.await;
}

result.map(|res| (api_key, res))
}
}
15 changes: 13 additions & 2 deletions modules/meteroid/crates/meteroid-store/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::errors::StoreError;
use diesel_async::pooled_connection::deadpool::Object;
use diesel_async::pooled_connection::deadpool::Pool;
use std::sync::Arc;

use crate::StoreResult;
use common_eventbus::{Event, EventBus};
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
use diesel_async::{AsyncConnection, AsyncPgConnection};
Expand All @@ -15,6 +17,7 @@ pub type PgConn = Object<AsyncPgConnection>;
pub struct Store {
pub pool: PgPool,
pub crypt_key: secrecy::SecretString,
pub eventbus: Arc<dyn EventBus<Event>>,
}

pub fn diesel_make_pg_pool(db_url: String) -> StoreResult<PgPool> {
Expand All @@ -29,10 +32,18 @@ pub fn diesel_make_pg_pool(db_url: String) -> StoreResult<PgPool> {
}

impl Store {
pub fn new(database_url: String, crypt_key: secrecy::SecretString) -> StoreResult<Self> {
pub fn new(
database_url: String,
crypt_key: secrecy::SecretString,
eventbus: Arc<dyn EventBus<Event>>,
) -> StoreResult<Self> {
let pool: PgPool = diesel_make_pg_pool(database_url)?;

Ok(Store { pool, crypt_key })
Ok(Store {
pool,
crypt_key,
eventbus,
})
}

pub async fn get_conn(&self) -> StoreResult<PgConn> {
Expand Down
12 changes: 2 additions & 10 deletions modules/meteroid/src/api/apitokens/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
use std::sync::Arc;

use meteroid_grpc::meteroid::api::apitokens::v1::api_tokens_service_server::ApiTokensServiceServer;
use meteroid_store::Store;

use crate::eventbus::{Event, EventBus};

mod error;
mod mapping;
mod service;

pub struct ApiTokensServiceComponents {
pub store: Store,
pub eventbus: Arc<dyn EventBus<Event>>,
}

pub fn service(
store: Store,
eventbus: Arc<dyn EventBus<Event>>,
) -> ApiTokensServiceServer<ApiTokensServiceComponents> {
let inner = ApiTokensServiceComponents { store, eventbus };
pub fn service(store: Store) -> ApiTokensServiceServer<ApiTokensServiceComponents> {
let inner = ApiTokensServiceComponents { store };
ApiTokensServiceServer::new(inner)
}
Loading

0 comments on commit 67fc85e

Please sign in to comment.