From 016e027cdaab847dc156829b623d44fdd7a7a9d7 Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 6 Apr 2018 15:35:13 -0500 Subject: [PATCH 01/13] Include formatter config --- .formatter.exs | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .formatter.exs diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..2bed17c --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,3 @@ +[ + inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] +] From 14c6c17c7c78133a5bcab295642a3b77077d5193 Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 6 Apr 2018 15:45:31 -0500 Subject: [PATCH 02/13] Run formatter against codebase --- config/config.exs | 2 +- config/dev.exs | 1 - config/test.exs | 4 +- lib/chronik.ex | 2 + lib/chronik/aggregate.ex | 159 ++++++---- lib/chronik/aggregate/multi.ex | 4 +- lib/chronik/aggregate/supervisor.ex | 5 +- lib/chronik/application.ex | 2 +- lib/chronik/config.ex | 8 +- lib/chronik/event_record.ex | 37 ++- lib/chronik/exceptions.ex | 13 +- lib/chronik/projection.ex | 85 +++--- lib/chronik/projection/dump_to_file.ex | 4 +- lib/chronik/projection/echo.ex | 2 +- lib/chronik/pub_sub.ex | 6 +- lib/chronik/pub_sub/adapters/registry.ex | 53 ++-- lib/chronik/store.ex | 55 ++-- lib/chronik/store/adapters/ecto/ecto.ex | 283 ++++++++++-------- lib/chronik/store/adapters/ecto/repo.ex | 3 +- .../store/adapters/ecto/schemas/aggregate.ex | 11 +- .../store/adapters/ecto/schemas/atom_type.ex | 3 + .../store/adapters/ecto/schemas/blob_type.ex | 8 +- .../adapters/ecto/schemas/domain_events.ex | 12 +- lib/chronik/store/adapters/ets/ets.ex | 111 +++---- lib/chronik/utils.ex | 6 +- mix.exs | 10 +- test/chronik/aggregate/aggregate_test.exs | 66 ++-- test/chronik/projection/projection_test.exs | 41 ++- test/chronik/pub_sub/pub_sub_test.exs | 6 +- test/chronik/store/store_test.exs | 63 ++-- 30 files changed, 609 insertions(+), 456 deletions(-) diff --git a/config/config.exs b/config/config.exs index c9c59bb..8233fe9 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,3 @@ use Mix.Config -import_config "#{Mix.env}.exs" +import_config "#{Mix.env()}.exs" diff --git a/config/dev.exs b/config/dev.exs index 4161002..73c8a74 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -9,4 +9,3 @@ config :chronik, ecto_repos: [Chronik.Store.Adapters.Ecto.ChronikRepo] config :chronik, Chronik.Store.Adapters.Ecto.ChronikRepo, adapter: Ecto.Adapters.MySQL, url: {:system, "CHRONIK_REPO_URL", "ecto://root:root@localhost/chronik"} - diff --git a/config/test.exs b/config/test.exs index 92b54cb..053aadc 100644 --- a/config/test.exs +++ b/config/test.exs @@ -12,6 +12,4 @@ config :chronik, Chronik.Aggregate.Test.Counter, shutdown_timeout: 1000, snapshot_every: 4 -config :chronik, Chronik.Store.Test.TestStore, - adapter: Chronik.Store.Adapters.ETS - +config :chronik, Chronik.Store.Test.TestStore, adapter: Chronik.Store.Adapters.ETS diff --git a/lib/chronik.ex b/lib/chronik.ex index f2811c4..a381371 100644 --- a/lib/chronik.ex +++ b/lib/chronik.ex @@ -27,9 +27,11 @@ defmodule Chronik do order. Missing events are fetch from `Chronik.Store`. Debugging can be turned off by placing + ``` config :chronik, :debug, false ``` + in a config script. """ diff --git a/lib/chronik/aggregate.ex b/lib/chronik/aggregate.ex index 104a302..b83a6b5 100644 --- a/lib/chronik/aggregate.ex +++ b/lib/chronik/aggregate.ex @@ -98,8 +98,8 @@ defmodule Chronik.Aggregate do given state, the function should return a list (or a single) of domain events. If the command is invalid the `handle_command` should raise an exception. """ - @callback handle_command(cmd :: Chronik.command(), - state :: state()) :: [Chronik.domain_event()] | no_return() + @callback handle_command(cmd :: Chronik.command(), state :: state()) :: + [Chronik.domain_event()] | no_return() @doc """ The `handle_event` is the transition function for the aggregate. After @@ -125,15 +125,17 @@ defmodule Chronik.Aggregate do alias Chronik.Aggregate.Supervisor alias Chronik.{AggregateRegistry, Config, Utils} - defstruct [:id, - :num_events, - :blocks, - :store, - :pub_sub , - :module, - :aggregate_version, - :aggregate_state, - :timer] + defstruct [ + :id, + :num_events, + :blocks, + :store, + :pub_sub, + :module, + :aggregate_version, + :aggregate_state, + :timer + ] # API @@ -145,22 +147,29 @@ defmodule Chronik.Aggregate do The results is either `:ok` or `{:error, reason}` in case of failure. """ - @spec command(module :: module(), - id :: Chronik.id(), - cmd :: term(), - timeout :: :infinity | non_neg_integer()) :: :ok | {:error, String.t} + @spec command( + module :: module(), + id :: Chronik.id(), + cmd :: term(), + timeout :: :infinity | non_neg_integer() + ) :: :ok | {:error, String.t()} def command(module, id, cmd, timeout \\ 5000) do - Utils.debug("#{inspect id}: executing command #{inspect cmd}") + Utils.debug("#{inspect(id)}: executing command #{inspect(cmd)}") + case Registry.lookup(AggregateRegistry, {module, id}) do [] -> case Supervisor.start_aggregate(module, id) do {:ok, pid} -> GenServer.call(pid, {module, cmd}, timeout) + {:error, reason} -> - {:error, "cannot create process for aggregate root " <> - "{#{module}, #{inspect id}}: #{inspect reason}"} + {:error, + "cannot create process for aggregate root " <> + "{#{module}, #{inspect(id)}}: #{inspect(reason)}"} end - [{pid, _metadata}] -> GenServer.call(pid, {module, cmd}, timeout) + + [{pid, _metadata}] -> + GenServer.call(pid, {module, cmd}, timeout) end end @@ -175,9 +184,9 @@ defmodule Chronik.Aggregate do @doc """ Start a `Chronik.Aggregate` with callbacks on `module` with id `id`. """ - @spec start_link(module :: module(), - id :: Chronik.id()) :: {:ok, pid()} - | {:error, reason :: String.t} + @spec start_link(module :: module(), id :: Chronik.id()) :: + {:ok, pid()} + | {:error, reason :: String.t()} def start_link(module, id) do GenServer.start_link(__MODULE__, {module, id}, name: via(module, id)) end @@ -187,24 +196,28 @@ defmodule Chronik.Aggregate do def init({module, id}) do # Fetch the configuration for the Store and the PubSub. {store, pub_sub} = Config.fetch_adapters() - Utils.debug("#{inspect id}: starting aggregate.") + Utils.debug("#{inspect(id)}: starting aggregate.") {:ok, version, aggregate_state} = load_from_store(module, id, store) - {:ok, %__MODULE__{id: id, - aggregate_state: aggregate_state, - aggregate_version: version, - timer: update_timer(nil, module), - num_events: 0, - blocks: 0, - store: store, - pub_sub: pub_sub, - module: module}} + {:ok, + %__MODULE__{ + id: id, + aggregate_state: aggregate_state, + aggregate_version: version, + timer: update_timer(nil, module), + num_events: 0, + blocks: 0, + store: store, + pub_sub: pub_sub, + module: module + }} end # The :state returns the current aggregate state. def handle_call(:state, _from, %__MODULE__{aggregate_state: as} = state) do {:reply, as, state} end + # When called with a function, the aggregate executes the function in # the current state and if no exceptions were raised, it stores and # publishes the events to the PubSub. @@ -214,7 +227,7 @@ defmodule Chronik.Aggregate do |> module.handle_command(as) |> List.wrap() - Utils.debug("#{inspect state.id}: newly created events: #{inspect new_events}") + Utils.debug("#{inspect(state.id)}: newly created events: #{inspect(new_events)}") new_state = Enum.reduce(new_events, as, &module.handle_event/2) store_and_publish(new_events, new_state, state) rescue @@ -231,7 +244,7 @@ defmodule Chronik.Aggregate do # :shutdown to the current process. def handle_info(:shutdown, %__MODULE__{id: id} = state) do # TODO: Do a snapshot before going down. - Utils.debug("#{inspect id}: aggregate going down gracefully due to inactivity.") + Utils.debug("#{inspect(id)}: aggregate going down gracefully due to inactivity.") {:stop, :normal, state} end @@ -246,24 +259,33 @@ defmodule Chronik.Aggregate do # for the aggregate. defp load_from_store(module, id, store) do aggregate_tuple = {module, id} + {version, state} = case store.get_snapshot(aggregate_tuple) do nil -> - Utils.debug("#{inspect id}: no snapshot found on the store.") + Utils.debug("#{inspect(id)}: no snapshot found on the store.") {:all, nil} + {version, _state} = snapshot -> - Utils.debug("#{inspect id}: found a snapshot on the store with version " <> - "#{inspect version}") + Utils.debug( + "#{inspect(id)}: found a snapshot on the store with version " <> "#{inspect(version)}" + ) + snapshot end + case store.fetch_by_aggregate(aggregate_tuple, version) do - {:error, _} -> state + {:error, _} -> + state + {:ok, version, records} -> - Utils.debug("#{inspect id}: replaying events up to version: #{inspect version}.") + Utils.debug("#{inspect(id)}: replaying events up to version: #{inspect(version)}.") + new_state = records |> Enum.map(&Map.get(&1, :domain_event)) |> apply_events(state, module) + {:ok, version, new_state} end end @@ -272,15 +294,19 @@ defmodule Chronik.Aggregate do Enum.reduce(events, state, &module.handle_event/2) end - defp store_and_publish(events, new_state, - %__MODULE__{id: id, - num_events: num_events, - blocks: blocks, - store: store, - pub_sub: pub_sub, - module: module, - aggregate_version: aggregate_version} = state) do - + defp store_and_publish( + events, + new_state, + %__MODULE__{ + id: id, + num_events: num_events, + blocks: blocks, + store: store, + pub_sub: pub_sub, + module: module, + aggregate_version: aggregate_version + } = state + ) do # Compute the expected version to be found on the Store. version = case aggregate_version do @@ -288,22 +314,25 @@ defmodule Chronik.Aggregate do v -> v end - Utils.debug("#{inspect id}: writing events to the store: #{inspect events}") + Utils.debug("#{inspect(id)}: writing events to the store: #{inspect(events)}") {new_version, records} = - case store.append({module, id}, events, [version: version]) do - {:ok, v, s} -> {v, s} + case store.append({module, id}, events, version: version) do + {:ok, v, s} -> + {v, s} + {:error, _} -> raise "a newer version of the aggregate found on the store" end - Utils.debug("#{inspect id}: broadcasting records: #{inspect records}") + Utils.debug("#{inspect(id)}: broadcasting records: #{inspect(records)}") pub_sub.broadcast(records) num_events = num_events + length(events) + blocks = if div(num_events, get_snapshot_every(module)) > blocks do - Utils.debug("#{inspect id}: saving a snapshot with version #{inspect new_version}") + Utils.debug("#{inspect(id)}: saving a snapshot with version #{inspect(new_version)}") store.snapshot({module, id}, new_state, new_version) div(num_events, get_snapshot_every(module)) else @@ -311,13 +340,14 @@ defmodule Chronik.Aggregate do end {:reply, :ok, - %__MODULE__{state | - aggregate_state: new_state, - aggregate_version: new_version, - timer: update_timer(state.timer, module), - num_events: num_events, - blocks: blocks - }} + %__MODULE__{ + state + | aggregate_state: new_state, + aggregate_version: new_version, + timer: update_timer(state.timer, module), + num_events: num_events, + blocks: blocks + }} end defp update_timer(timer, module) do @@ -325,6 +355,7 @@ defmodule Chronik.Aggregate do if timer do Process.cancel_timer(timer) + receive do :shutdown -> :ok after @@ -332,13 +363,13 @@ defmodule Chronik.Aggregate do end end - if shutdown_timeout != :infinity, - do: Process.send_after(self(), :shutdown, shutdown_timeout) + if shutdown_timeout != :infinity do + Process.send_after(self(), :shutdown, shutdown_timeout) + end end defp get_shutdown_timeout(module), do: Config.get_config(module, :shutdown_timeout, 15 * 1000 * 60) - defp get_snapshot_every(module), - do: Config.get_config(module, :snapshot_every, 100) + defp get_snapshot_every(module), do: Config.get_config(module, :snapshot_every, 100) end diff --git a/lib/chronik/aggregate/multi.ex b/lib/chronik/aggregate/multi.ex index a7418a3..2d4945c 100644 --- a/lib/chronik/aggregate/multi.ex +++ b/lib/chronik/aggregate/multi.ex @@ -29,6 +29,8 @@ defmodule Chronik.Aggregate.Multi do @type monad_state :: {Aggregate.state(), [Chronik.domain_event()], module()} + # API + @doc "Create a new state for a multi-entity command." @spec new(state :: Aggregate.state(), module :: module()) :: monad_state() def new(state, module), do: {state, [], module} @@ -40,7 +42,7 @@ defmodule Chronik.Aggregate.Multi do """ @spec delegate(ms :: monad_state(), lens :: fun(), validator_fun :: fun()) :: monad_state() def delegate({state, events, module}, lens_fun, validator_fun) - when is_function(lens_fun) and is_function(validator_fun) do + when is_function(lens_fun) and is_function(validator_fun) do new_events = state |> lens_fun.() diff --git a/lib/chronik/aggregate/supervisor.ex b/lib/chronik/aggregate/supervisor.ex index e984b99..f5b9007 100644 --- a/lib/chronik/aggregate/supervisor.ex +++ b/lib/chronik/aggregate/supervisor.ex @@ -7,8 +7,9 @@ defmodule Chronik.Aggregate.Supervisor do # API - @spec start_aggregate(aggregate :: atom(), id :: term()) :: {:ok, pid()} - | {:error, term()} + @spec start_aggregate(aggregate :: atom(), id :: term()) :: + {:ok, pid()} + | {:error, term()} def start_aggregate(aggregate, id) do Supervisor.start_child(__MODULE__, [aggregate, id]) end diff --git a/lib/chronik/application.ex b/lib/chronik/application.ex index e505a2d..88cc37b 100644 --- a/lib/chronik/application.ex +++ b/lib/chronik/application.ex @@ -8,7 +8,7 @@ defmodule Chronik.Application do def start(_type, _args) do children = [ spec([keys: :unique, name: @aggregates], @aggregates), - Chronik.Aggregate.Supervisor, + Chronik.Aggregate.Supervisor ] opts = [strategy: :one_for_one, name: Chronik.Supervisor] diff --git a/lib/chronik/config.ex b/lib/chronik/config.ex index 06b7d76..78691f1 100644 --- a/lib/chronik/config.ex +++ b/lib/chronik/config.ex @@ -4,14 +4,14 @@ defmodule Chronik.Config do @doc """ Returns the adapter configuration for a given `module` """ - @spec fetch_config(mod :: module(), opts :: Keyword.t) :: {term(), atom()} | no_return() + @spec fetch_config(mod :: module(), opts :: Keyword.t()) :: {term(), atom()} | no_return() def fetch_config(mod, opts) do # Stolen from Ecto otp_app = Keyword.fetch!(opts, :otp_app) - config = Application.get_env(otp_app, mod, []) + config = Application.get_env(otp_app, mod, []) adapter = opts[:adapter] || config[:adapter] - unless adapter, do: raise Chronik.MissingAdapterError, opts + unless adapter, do: raise(Chronik.MissingAdapterError, opts) case Code.ensure_loaded?(adapter) do true -> {config, adapter} @@ -36,7 +36,7 @@ defmodule Chronik.Config do def fetch_adapters do config = Application.get_env(:chronik, :adapters) pub_sub = Keyword.fetch!(config, :pub_sub) - store = Keyword.fetch!(config, :store) + store = Keyword.fetch!(config, :store) {store, pub_sub} end diff --git a/lib/chronik/event_record.ex b/lib/chronik/event_record.ex index 8eb6f55..069c988 100644 --- a/lib/chronik/event_record.ex +++ b/lib/chronik/event_record.ex @@ -5,28 +5,35 @@ defmodule Chronik.EventRecord do """ defstruct [ - :aggregate, # The aggregate that emitted this event. - :created_at, # Creation timestamp. - :domain_event, # Data of the domain event. - :version, # Version of the event. - :aggregate_version, # Version of the aggregate that generated the event. + # The aggregate that emitted this event. + :aggregate, + # Creation timestamp. + :created_at, + # Data of the domain event. + :domain_event, + # Version of the event. + :version, + # Version of the aggregate that generated the event. + :aggregate_version ] @type t :: %__MODULE__{ - aggregate: Chronik.Aggregate.t(), - created_at: non_neg_integer(), - domain_event: any(), - version: Chronik.Store.version(), - aggregate_version: Chronik.Store.version() - } + aggregate: Chronik.Aggregate.t(), + created_at: non_neg_integer(), + domain_event: any(), + version: Chronik.Store.version(), + aggregate_version: Chronik.Store.version() + } # API @doc "Helper function for creating records from domain events" - @spec create(domain_event :: Chronik.domain_event(), - aggregate :: Chronik.Aggregate.t(), - version :: Chronik.Store.version(), - aggregate_version :: Chronik.Store.version()) :: __MODULE__.t + @spec create( + domain_event :: Chronik.domain_event(), + aggregate :: Chronik.Aggregate.t(), + version :: Chronik.Store.version(), + aggregate_version :: Chronik.Store.version() + ) :: __MODULE__.t() def create(domain_event, aggregate, version, aggregate_version) do %__MODULE__{ created_at: System.system_time(:seconds), diff --git a/lib/chronik/exceptions.ex b/lib/chronik/exceptions.ex index ce882f7..35c6432 100644 --- a/lib/chronik/exceptions.ex +++ b/lib/chronik/exceptions.ex @@ -3,10 +3,11 @@ defmodule Chronik.MissingAdapterError do defexception [:message] def exception(opts) do - msg = "missing adapter. This happens when there is no " <> - "default adapter configured that implements the " <> - "event store behaviour.\n" <> - "options: #{inspect opts}" + msg = + "missing adapter. This happens when there is no " <> + "default adapter configured that implements the " <> + "event store behaviour.\n" <> "options: #{inspect(opts)}" + %__MODULE__{message: msg} end end @@ -16,7 +17,7 @@ defmodule Chronik.AdapterLoadError do defexception [:message] def exception(adapter) do - msg = "error loading #{inspect adapter}" + msg = "error loading #{inspect(adapter)}" %__MODULE__{message: msg} end end @@ -26,7 +27,7 @@ defmodule Chronik.MissingConfigError do defexception [:message] def exception(module, key) do - msg = "error missing configuration #{inspect key} for module #{module}" + msg = "error missing configuration #{inspect(key)} for module #{module}" %__MODULE__{message: msg} end end diff --git a/lib/chronik/projection.ex b/lib/chronik/projection.ex index e0f75b1..5b55792 100644 --- a/lib/chronik/projection.ex +++ b/lib/chronik/projection.ex @@ -21,17 +21,16 @@ defmodule Chronik.Projection do defmodule CounterState do @behaviour Chronik.Projection - alias DomainEvents.CounterCreated - alias DomainEvents.CounterIncremented + alias DomainEvents.{CounterCreated, CounterIncremented} + alias Chronik.Projection def start_link(opts), do: Projection.start_link(__MODULE__, opts) def init(_opts), do: {nil, []} - def handle_event(%CounterCreated{}, nil) do - 0 - end + def handle_event(%CounterCreated{}, nil), do: 0 + def handle_event(%CounterIncremented{increment: increment}, value) do value + increment end @@ -59,7 +58,7 @@ defmodule Chronik.Projection do the `Chronik.PubSub`. Possible values are `:eventual` (default) and `:strict`. """ - @callback init(opts :: Keyword.t) :: {state(), Keyword.t} + @callback init(opts :: Keyword.t()) :: {state(), Keyword.t()} @doc """ The `handle_event` function is executed each time an event record is @@ -78,11 +77,7 @@ defmodule Chronik.Projection do alias Chronik.{EventRecord, Config, Utils} - defstruct [:version, - :pub_sub, - :store, - :projection_state, - :projection] + defstruct [:version, :pub_sub, :store, :projection_state, :projection] # API @@ -115,18 +110,21 @@ defmodule Chronik.Projection do # First subscribe to the PubSub to start receiving records. Note # that as the GenServer is synchronous we won't process any # messages until we finish the init function. - :ok = pub_sub.subscribe([consistency: consistency]) + :ok = pub_sub.subscribe(consistency: consistency) # From the state return by the client code and its version, read # from the Store (starting at version) de replay the missing # events. {state, version} = fetch_and_replay(version, state, projection, store) - {:ok, %__MODULE__{version: version, - pub_sub: pub_sub, - store: store, - projection_state: state, - projection: projection}} + {:ok, + %__MODULE__{ + version: version, + pub_sub: pub_sub, + store: store, + projection_state: state, + projection: projection + }} end def handle_call(:state, _from, %__MODULE__{projection_state: ps} = state) do @@ -143,58 +141,63 @@ defmodule Chronik.Projection do # When configured with eventual consistency the records are # delivered as messages processed by the handle_info. - def handle_info(%EventRecord{} = e, %__MODULE__{projection: projection, - store: store, - projection_state: ps, - version: version} = state) do + def handle_info( + %EventRecord{} = e, + %__MODULE__{projection: projection, store: store, projection_state: ps, version: version} = + state + ) do + # Use the Store to compare the current version and the version + # of the incoming record. new_state = - # Use the Store to compare the current version and the version - # of the incoming record. case store.compare_version(version, e.version) do # If the record that came from the PubSub is the next_one to # the one we last saw, transition to the following state. :next_one -> - Utils.debug("#{projection} :applying event coming from the PubSub with version #{e.version}") + Utils.debug( + "#{projection} :applying event coming from the PubSub with version #{e.version}" + ) # Update the consumer state applying the record calling the # client `handle_event/2` code. - %{state | version: e.version, - projection_state: projection.handle_event(e, ps)} + %{state | version: e.version, projection_state: projection.handle_event(e, ps)} + :past -> # If the version of the event that came from the PubSub is from # the past, just ingnore it. Utils.debug("#{projection} :discarding event from the past with version #{e.version}") state + :equal -> # If we already saw this event skip it. state + :future -> # If the event that came from the PubSub is in the future (BTTF) # try to fetch the missing events from the Store and the apply # the incoming record. - Utils.debug("#{projection} :event(s) coming from the future with version " <> - "#{e.version}. Fetching missing events starting at " <> - "version #{version} from the store.") + Utils.debug( + "#{projection} :event(s) coming from the future with version " <> + "#{e.version}. Fetching missing events starting at " <> + "version #{version} from the store." + ) # Note that the catch up is a best effort approach since # events could still be missing in the Store. - {proj_state, new_version} = - catch_up(version, ps, projection, store) + {proj_state, new_version} = catch_up(version, ps, projection, store) - %{state | version: new_version, - projection_state: proj_state} + %{state | version: new_version, projection_state: proj_state} end {:noreply, new_state} end # Internal functions + defp make_stream_handler(projection_state, version, projection) do fn stream -> - Enum.reduce(stream, {projection_state, version}, - fn record, {state, _version} -> - {projection.handle_event(record, state), record.version} - end) + Enum.reduce(stream, {projection_state, version}, fn record, {state, _version} -> + {projection.handle_event(record, state), record.version} + end) end end @@ -210,9 +213,13 @@ defmodule Chronik.Projection do # There were no events on the Store to catch up. Utils.warn("#{projection} :no events found on the Store to do a catch_up") {projection_state, :empty} + {new_projection_state, new_version} -> # Found some events on the store. Update the projection state. - Utils.debug("#{projection} :catching up events from the store starting at version #{version}") + Utils.debug( + "#{projection} :catching up events from the store starting at version #{version}" + ) + {new_projection_state, new_version} end end @@ -221,6 +228,7 @@ defmodule Chronik.Projection do defp fetch_and_replay(:current, projection_state, _projection, store) do {projection_state, store.current_version()} end + defp fetch_and_replay(version, projection_state, projection, store) do from = if version == :empty, do: :all, else: version @@ -230,6 +238,7 @@ defmodule Chronik.Projection do {^projection_state, :empty} -> Utils.warn("#{projection} :no events found in the store.") {projection_state, :empty} + {new_projection_state, new_version} -> Utils.debug("#{projection} :re-playing events from version #{version}") {new_projection_state, new_version} diff --git a/lib/chronik/projection/dump_to_file.ex b/lib/chronik/projection/dump_to_file.ex index fbd9584..872ae92 100644 --- a/lib/chronik/projection/dump_to_file.ex +++ b/lib/chronik/projection/dump_to_file.ex @@ -6,7 +6,7 @@ defmodule Chronik.Projection.DumpToFile do and start it as any projection. """ - defmacro __using__([filename: filename]) do + defmacro __using__(filename: filename) do quote do @behaviour Chronik.Projection @@ -20,7 +20,7 @@ defmodule Chronik.Projection.DumpToFile do end def handle_event(%EventRecord{domain_event: event}, filename) do - :ok = File.write(filename, "[#{__MODULE__}] #{inspect event}\n", [:append]) + :ok = File.write(filename, "[#{__MODULE__}] #{inspect(event)}\n", [:append]) filename end end diff --git a/lib/chronik/projection/echo.ex b/lib/chronik/projection/echo.ex index 94ea4a8..f8269d0 100644 --- a/lib/chronik/projection/echo.ex +++ b/lib/chronik/projection/echo.ex @@ -17,7 +17,7 @@ defmodule Chronik.Projection.Echo do def init(_opts), do: {nil, []} def handle_event(%EventRecord{domain_event: event}, state) do - IO.puts "[#{__MODULE__}] #{inspect event}" + IO.puts("[#{__MODULE__}] #{inspect(event)}") state end end diff --git a/lib/chronik/pub_sub.ex b/lib/chronik/pub_sub.ex index e0d53d2..1c7710c 100644 --- a/lib/chronik/pub_sub.ex +++ b/lib/chronik/pub_sub.ex @@ -1,4 +1,4 @@ - defmodule Chronik.PubSub do +defmodule Chronik.PubSub do @moduledoc """ `Chronik.PubSub` adapter contract and API. @@ -25,7 +25,7 @@ end @typedoc "The result status of all operations on the `Chronik.PubSub`" - @type result_status :: :ok | {:error, String.t} + @type result_status :: :ok | {:error, String.t()} @doc """ Subscribes the caller to the PubSub. @@ -37,7 +37,7 @@ - `consistency`: `:eventual` (default) or `:strict` """ - @callback subscribe(opts :: Keyword.t) :: result_status() + @callback subscribe(opts :: Keyword.t()) :: result_status() @doc """ Unsubscribes the caller from the PubSub. No further events are diff --git a/lib/chronik/pub_sub/adapters/registry.ex b/lib/chronik/pub_sub/adapters/registry.ex index 5b78202..61518a6 100644 --- a/lib/chronik/pub_sub/adapters/registry.ex +++ b/lib/chronik/pub_sub/adapters/registry.ex @@ -11,32 +11,32 @@ defmodule Chronik.PubSub.Adapters.Registry do @name __MODULE__ - @spec child_spec(Keyword.t | map()) :: map() + @spec child_spec(Keyword.t() | map()) :: map() def child_spec(args) do Registry.child_spec(keys: :duplicate, name: args[:name] || @name) end def start_link(_opts) do - Registry.start_link([keys: :duplicate, name: @name]) + Registry.start_link(keys: :duplicate, name: @name) end - @spec subscribe(opts :: Keyword.t) :: :ok + @spec subscribe(opts :: Keyword.t()) :: :ok def subscribe(opts \\ []) do # See the consistency type given by the subscriber. By default # the subscription is eventual. consistency = Keyword.get(opts, :consistency, :eventual) - Utils.debug("#{inspect self()} #{consistency} subscribed.") + Utils.debug("#{inspect(self())} #{consistency} subscribed.") # There is only one stream (:stream_all). We store the type of # consistency on the registry as the metadata. - {:ok, _} = Registry.register(@name, :stream_all, [consistency: consistency]) + {:ok, _} = Registry.register(@name, :stream_all, consistency: consistency) :ok end @spec unsubscribe :: :ok def unsubscribe do - Utils.debug("#{inspect self()} un-subscribed.") + Utils.debug("#{inspect(self())} un-subscribed.") # Unregister the process from the stream_all. Registry.unregister(@name, :stream_all) @@ -44,26 +44,31 @@ defmodule Chronik.PubSub.Adapters.Registry do @spec broadcast(records :: [Chronik.EventRecord]) :: :ok def broadcast(records) do - Utils.debug("broadcasting: #{inspect records}") + Utils.debug("broadcasting: #{inspect(records)}") for record <- records do - Registry.dispatch(@name, :stream_all, - &(for {pid, opts} <- &1 do - # Check the consistency type of the subscriber. If the - # consistency is :eventual we send a normal message, if - # it is :strict then we do a synchronous call - case Keyword.fetch!(opts, :consistency) do - :eventual -> - send(pid, record) - :strict -> - case GenServer.call(pid, {:process, record}) do - :ok -> :ok - _ -> - Utils.warn("the #{inspect pid} projection replied " <> - "a non :ok result.") - end - end - end)) + Registry.dispatch( + @name, + :stream_all, + &for {pid, opts} <- &1 do + # Check the consistency type of the subscriber. If the + # consistency is :eventual we send a normal message, if + # it is :strict then we do a synchronous call + case Keyword.fetch!(opts, :consistency) do + :eventual -> + send(pid, record) + + :strict -> + case GenServer.call(pid, {:process, record}) do + :ok -> + :ok + + _ -> + Utils.warn("the #{inspect(pid)} projection replied " <> "a non :ok result.") + end + end + end + ) end :ok diff --git a/lib/chronik/store.ex b/lib/chronik/store.ex index fa8116e..a412430 100644 --- a/lib/chronik/store.ex +++ b/lib/chronik/store.ex @@ -4,7 +4,7 @@ defmodule Chronik.Store do """ @typedoc "The options given for reading events from the stream" - @type options :: Keyword.t + @type options :: Keyword.t() @typedoc """ The version of a given event record in the Store. @@ -14,7 +14,7 @@ defmodule Chronik.Store do """ @type version :: term() | :empty - @typep events :: [Chronik.domain_event] + @typep events :: [Chronik.domain_event()] @typep event_record :: Chronik.EventRecord @typep event_records :: [event_record] @@ -43,10 +43,9 @@ defmodule Chronik.Store do The return values are `{:ok, last_inserted_version, records}` on success or `{:error, message}` in case of failure. """ - @callback append(aggregate :: Chronik.Aggregate, - events :: events(), - opts :: options()) :: {:ok, version(), event_records()} - | {:error, String.t} + @callback append(aggregate :: Chronik.Aggregate, events :: events(), opts :: options()) :: + {:ok, version(), event_records()} + | {:error, String.t()} @doc """ Retrieves all events from the store starting (but not including) at @@ -62,8 +61,9 @@ defmodule Chronik.Store do If no records are found on the stream (starting at version) the function returns `{:ok, version, []}`. """ - @callback fetch(version :: version()) :: {:ok, version(), event_records()} - | {:error, String.t} + @callback fetch(version :: version()) :: + {:ok, version(), event_records()} + | {:error, String.t()} @doc """ Retrieves all events from the store for a given aggregate starting @@ -79,9 +79,9 @@ defmodule Chronik.Store do If no records are found on the stream (starting at version) the function returns `{:ok, version, []}`. """ - @callback fetch_by_aggregate(aggregate :: Chronik.Aggregate, - version :: version()) :: {:ok, version(), event_records()} - | {:error, String.t} + @callback fetch_by_aggregate(aggregate :: Chronik.Aggregate, version :: version()) :: + {:ok, version(), event_records()} + | {:error, String.t()} @doc """ This function allows the Projection module to comapre versions of @@ -91,19 +91,23 @@ defmodule Chronik.Store do implementation is to compare the integers and return the corresponding atoms. """ - @callback compare_version(version :: version(), version :: version()) :: :past - | :next_one - | :future - | :equal + @callback compare_version(version :: version(), version :: version()) :: + :past + | :next_one + | :future + | :equal @doc """ This function creates a snapshot in the store for the given `aggregate`. The Store only stores the last snapshot. """ - @callback snapshot(aggregate :: Chronik.Aggregate, - state :: Chronik.Aggregate.state, - version :: version()) :: :ok - | {:error, reason() :: String.t} + @callback snapshot( + aggregate :: Chronik.Aggregate, + state :: Chronik.Aggregate.state(), + version :: version() + ) :: + :ok + | {:error, reason() :: String.t()} @doc """ Retrives a snapshot from the Store. If there is no snapshot it @@ -113,7 +117,7 @@ defmodule Chronik.Store do `{version, state}` indicating the state of the snapshot and with wich version of the aggregate was created. """ - @callback get_snapshot(aggregate :: Chronik.Aggregate) :: {version(), Chronik.Aggregate.state} + @callback get_snapshot(aggregate :: Chronik.Aggregate) :: {version(), Chronik.Aggregate.state()} @doc """ Retrives the current version of the store. If there are no record returns :empty. @@ -124,16 +128,17 @@ defmodule Chronik.Store do Calls the `fun` function over a stream of domain events starting at version `version`. """ - @callback stream(fun :: (event_record() , any() -> any()), - version :: version()) :: any() + @callback stream(fun :: (event_record(), any() -> any()), version :: version()) :: any() @doc """ Calls the `fun` function over the `aggregate`'s domain event stream starting at version `version`. """ - @callback stream_by_aggregate(aggregate :: Chronik.Aggregate, - fun :: (event_record() , any() -> any()), - version :: version()) :: any() + @callback stream_by_aggregate( + aggregate :: Chronik.Aggregate, + fun :: (event_record(), any() -> any()), + version :: version() + ) :: any() defmacro __using__(opts) do quote bind_quoted: [opts: opts] do diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 7e395f4..b9a60c3 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -40,7 +40,9 @@ defmodule Chronik.Store.Adapters.Ecto do end def compare_version(a, a), do: :equal + def compare_version(:empty, "0"), do: :next_one + def compare_version(a, b) when is_bitstring(a) and is_bitstring(b) do case {String.to_integer(a), String.to_integer(b)} do {v1, v2} when v2 == v1 + 1 -> :next_one @@ -48,15 +50,15 @@ defmodule Chronik.Store.Adapters.Ecto do {v1, v2} when v2 < v1 -> :past end end + def compare_version(a, :empty) when is_number(a), do: :past + def compare_version(_, _), do: :error def current_version(), do: GenServer.call(@name, :current_version) - def stream_by_aggregate(aggregate, fun, version \\ :all) do - transaction = - GenServer.call(@name, {:stream_by_aggregate, aggregate, version}) + transaction = GenServer.call(@name, {:stream_by_aggregate, aggregate, version}) {:ok, result} = transaction.(fun) result end @@ -89,54 +91,66 @@ defmodule Chronik.Store.Adapters.Ecto do def handle_call(:current_version, _from, state) do {:reply, store_version(), state} end + # Write the events to the DB. def handle_call({:append, aggregate, events, opts}, _from, state) do aggregate_version = aggregate_version(aggregate) version = Keyword.get(opts, :version) - if (version == :no_stream and aggregate_version == :empty) or - (version == :any) or - (is_bitstring(version) and version == aggregate_version) do - {:reply, do_append(aggregate, events), state} + + if (version == :no_stream and aggregate_version == :empty) or version == :any or + (is_bitstring(version) and version == aggregate_version) do + {:reply, do_append(aggregate, events), state} else {:reply, {:error, "wrong expected version"}, state} end end - # Get the events for a given aggregate starting at version. - def handle_call({:fetch_by_aggregate, - {aggregate_module, aggregate_id} = aggregate, version}, _from, state) do + # Get the events for a given aggregate starting at version. + def handle_call( + {:fetch_by_aggregate, {aggregate_module, aggregate_id} = aggregate, version}, + _from, + state + ) do starting_at = case version do :all -> -1 v -> String.to_integer(v) end - query = from e in DomainEvents, - join: a in Aggregate, - where: a.id == e.aggregate_id, - where: a.aggregate == ^aggregate_module, - where: a.aggregate_id == ^aggregate_id, - where: e.aggregate_version > ^starting_at, - order_by: e.aggregate_version, - select: %{ - version: e.version, - aggregate: {a.aggregate, a.aggregate_id}, - domain_event: e.domain_event, - aggregate_version: e.aggregate_version, - created: e.created - } + query = + from( + e in DomainEvents, + join: a in Aggregate, + where: a.id == e.aggregate_id, + where: a.aggregate == ^aggregate_module, + where: a.aggregate_id == ^aggregate_id, + where: e.aggregate_version > ^starting_at, + order_by: e.aggregate_version, + select: %{ + version: e.version, + aggregate: {a.aggregate, a.aggregate_id}, + domain_event: e.domain_event, + aggregate_version: e.aggregate_version, + created: e.created + } + ) case Repo.all(query) do - [] -> {:reply, {:ok, aggregate_version(aggregate), []}, state} + [] -> + {:reply, {:ok, aggregate_version(aggregate), []}, state} + rows -> records = Enum.map(rows, &build_record/1) + version = records |> List.last() |> Map.get(:aggregate_version) + {:reply, {:ok, version, records}, state} end end + # Fetch all the events from the all-stream starting at version. def handle_call({:fetch, version}, _from, state) do starting_at = @@ -145,64 +159,78 @@ defmodule Chronik.Store.Adapters.Ecto do v -> String.to_integer(v) end - query = from e in DomainEvents, - join: a in Aggregate, - where: a.id == e.aggregate_id, - where: e.version > ^starting_at, - order_by: e.version, - select: %{ - version: e.version, - aggregate: {a.aggregate, a.aggregate_id}, - domain_event: e.domain_event, - aggregate_version: e.aggregate_version, - created: e.created - } + query = + from( + e in DomainEvents, + join: a in Aggregate, + where: a.id == e.aggregate_id, + where: e.version > ^starting_at, + order_by: e.version, + select: %{ + version: e.version, + aggregate: {a.aggregate, a.aggregate_id}, + domain_event: e.domain_event, + aggregate_version: e.aggregate_version, + created: e.created + } + ) case Repo.all(query) do - [] -> {:reply, {:ok, store_version(), []}, state} + [] -> + {:reply, {:ok, store_version(), []}, state} + rows -> records = Enum.map(rows, &build_record/1) + version = records |> List.last() |> Map.get(:version) + {:reply, {:ok, version, records}, state} end end - # Take a snapshot and write it to the DB. - def handle_call({:snapshot, {aggregate, id}, - aggregate_state, version}, _from, state) do + # Take a snapshot and write it to the DB. + def handle_call({:snapshot, {aggregate, id}, aggregate_state, version}, _from, state) do Aggregate |> where(aggregate: ^aggregate) |> where(aggregate_id: ^id) |> Repo.update_all( - set: [snapshot_version: version, - snapshot: :erlang.term_to_binary(aggregate_state)]) + set: [snapshot_version: version, snapshot: :erlang.term_to_binary(aggregate_state)] + ) {:reply, :ok, state} end + def handle_call({:get_snapshot, {aggregate, id}}, _from, state) do query = - from a in Aggregate, - where: a.aggregate == ^aggregate, - where: a.aggregate_id == ^id, - where: not is_nil(a.snapshot_version) + from( + a in Aggregate, + where: a.aggregate == ^aggregate, + where: a.aggregate_id == ^id, + where: not is_nil(a.snapshot_version) + ) case Repo.one(query) do - nil -> {:reply, nil, state} + nil -> + {:reply, nil, state} + %Aggregate{snapshot_version: version, snapshot: snapshot} -> try do {:reply, {"#{version}", :erlang.binary_to_term(snapshot)}, state} rescue _ -> - Logger.error("could not load the snapshot for " <> - "#{inspect {aggregate, id}} from the " <> - "store. Possible data corruption.") + Logger.error( + "could not load the snapshot for " <> + "#{inspect({aggregate, id})} from the " <> "store. Possible data corruption." + ) + {:reply, nil, state} end end end + def handle_call({:stream, version}, _from, state) do starting_at = case version do @@ -210,69 +238,73 @@ defmodule Chronik.Store.Adapters.Ecto do v -> String.to_integer(v) end - query = from e in DomainEvents, - join: a in Aggregate, - where: a.id == e.aggregate_id, - where: e.version > ^starting_at, - order_by: e.version, - select: %{ - version: e.version, - aggregate: {a.aggregate, a.aggregate_id}, - domain_event: e.domain_event, - aggregate_version: e.aggregate_version, - created: e.created - } - - ret = - fn fun -> - Repo.transaction(fn() -> - query - |> Repo.stream() - |> Stream.map(&build_record/1) - |> fun.() - end) - end + query = + from( + e in DomainEvents, + join: a in Aggregate, + where: a.id == e.aggregate_id, + where: e.version > ^starting_at, + order_by: e.version, + select: %{ + version: e.version, + aggregate: {a.aggregate, a.aggregate_id}, + domain_event: e.domain_event, + aggregate_version: e.aggregate_version, + created: e.created + } + ) + + ret = fn fun -> + Repo.transaction(fn -> + query + |> Repo.stream() + |> Stream.map(&build_record/1) + |> fun.() + end) + end {:reply, ret, state} end - def handle_call({:stream_by_aggregate, - {aggregate_module, aggregate_id}, version}, _from, state) do + def handle_call({:stream_by_aggregate, {aggregate_module, aggregate_id}, version}, _from, state) do starting_at = case version do :all -> -1 v -> String.to_integer(v) end - query = from e in DomainEvents, - join: a in Aggregate, - where: a.id == e.aggregate_id, - where: a.aggregate == ^aggregate_module, - where: a.aggregate_id == ^aggregate_id, - where: e.aggregate_version > ^starting_at, - order_by: e.aggregate_version, - select: %{ - version: e.version, - aggregate: {a.aggregate, a.aggregate_id}, - domain_event: e.domain_event, - aggregate_version: e.aggregate_version, - created: e.created - } - - ret = - fn fun -> - Repo.transaction(fn() -> - query - |> Repo.stream() - |> Stream.map(&build_record/1) - |> fun.() - end) - end + query = + from( + e in DomainEvents, + join: a in Aggregate, + where: a.id == e.aggregate_id, + where: a.aggregate == ^aggregate_module, + where: a.aggregate_id == ^aggregate_id, + where: e.aggregate_version > ^starting_at, + order_by: e.aggregate_version, + select: %{ + version: e.version, + aggregate: {a.aggregate, a.aggregate_id}, + domain_event: e.domain_event, + aggregate_version: e.aggregate_version, + created: e.created + } + ) + + ret = fn fun -> + Repo.transaction(fn -> + query + |> Repo.stream() + |> Stream.map(&build_record/1) + |> fun.() + end) + end {:reply, ret, state} end # Internal functions + defp get_aggregate({aggregate, id}) do case Repo.get_by(Aggregate, aggregate: aggregate, aggregate_id: id) do nil -> %Aggregate{aggregate: aggregate, aggregate_id: id} @@ -285,7 +317,8 @@ defmodule Chronik.Store.Adapters.Ecto do domain_event(row.domain_event, row.aggregate), row.aggregate, "#{row.version}", - "#{row.aggregate_version}") + "#{row.aggregate_version}" + ) end defp aggregate_table_id(aggregate) do @@ -298,8 +331,12 @@ defmodule Chronik.Store.Adapters.Ecto do defp do_append(aggregate, events) do {records, _version, aggregate_version} = from_enum(events, aggregate) - Repo.insert_all(DomainEvents, - Enum.map(records, &insert_event(&1, aggregate_table_id(aggregate)))) + + Repo.insert_all( + DomainEvents, + Enum.map(records, &insert_event(&1, aggregate_table_id(aggregate))) + ) + {:ok, aggregate_version, records} end @@ -328,10 +365,12 @@ defmodule Chronik.Store.Adapters.Ecto do defp store_version do query = - from e in DomainEvents, - select: e.version, - order_by: [desc: e.version], - limit: 1 + from( + e in DomainEvents, + select: e.version, + order_by: [desc: e.version], + limit: 1 + ) case Repo.one(query) do nil -> :empty @@ -341,7 +380,8 @@ defmodule Chronik.Store.Adapters.Ecto do defp aggregate_version({aggregate_module, aggregate_id}) do query = - from e in DomainEvents, + from( + e in DomainEvents, join: a in Aggregate, where: e.aggregate_id == a.id, where: a.aggregate == ^aggregate_module, @@ -349,6 +389,7 @@ defmodule Chronik.Store.Adapters.Ecto do order_by: [desc: e.aggregate_version], select: e.aggregate_version, limit: 1 + ) case Repo.one(query) do nil -> :empty @@ -364,25 +405,27 @@ defmodule Chronik.Store.Adapters.Ecto do defp from_enum(events, aggregate) do events - |> Enum.reduce({[], store_version(), aggregate_version(aggregate)}, - fn e, {records, st_version, agg_version} -> - { - records ++ - [EventRecord.create(e, aggregate, next_version(st_version), - next_version(agg_version))], - next_version(st_version), - next_version(agg_version) - } - end) + |> Enum.reduce({[], store_version(), aggregate_version(aggregate)}, fn e, + {records, st_version, + agg_version} -> + { + records ++ + [EventRecord.create(e, aggregate, next_version(st_version), next_version(agg_version))], + next_version(st_version), + next_version(agg_version) + } + end) end def domain_event(event, aggregate) do :erlang.binary_to_term(event) rescue _ -> - Logger.error("could not load some events for " <> - "#{inspect aggregate} from the " <> - "store. Possible data corruption.") + Logger.error( + "could not load some events for " <> + "#{inspect(aggregate)} from the " <> "store. Possible data corruption." + ) + nil end end diff --git a/lib/chronik/store/adapters/ecto/repo.ex b/lib/chronik/store/adapters/ecto/repo.ex index 3d2840e..fcad75c 100644 --- a/lib/chronik/store/adapters/ecto/repo.ex +++ b/lib/chronik/store/adapters/ecto/repo.ex @@ -11,6 +11,7 @@ defmodule Chronik.Store.Adapters.Ecto.ChronikRepo do unless config[:url] do raise "Set url config for #{__MODULE__}!" end + {:ok, config} end @@ -19,6 +20,6 @@ defmodule Chronik.Store.Adapters.Ecto.ChronikRepo do id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, type: :supervisor - } + } end end diff --git a/lib/chronik/store/adapters/ecto/schemas/aggregate.ex b/lib/chronik/store/adapters/ecto/schemas/aggregate.ex index 278cac9..db6d2bf 100644 --- a/lib/chronik/store/adapters/ecto/schemas/aggregate.ex +++ b/lib/chronik/store/adapters/ecto/schemas/aggregate.ex @@ -1,14 +1,17 @@ defmodule Chronik.Store.Adapters.Ecto.Aggregate do @moduledoc false + use Ecto.Schema + import Ecto.Changeset + alias Chronik.Store.Adapters.Ecto.Blob schema "aggregates" do - field :aggregate, Chronik.Store.Adapters.Ecto.AtomType - field :aggregate_id, :string - field :snapshot_version, :integer - field :snapshot, Blob + field(:aggregate, Chronik.Store.Adapters.Ecto.AtomType) + field(:aggregate_id, :string) + field(:snapshot_version, :integer) + field(:snapshot, Blob) end def changeset(aggregate, params \\ %{}) do diff --git a/lib/chronik/store/adapters/ecto/schemas/atom_type.ex b/lib/chronik/store/adapters/ecto/schemas/atom_type.ex index 23af3f7..c1c9370 100644 --- a/lib/chronik/store/adapters/ecto/schemas/atom_type.ex +++ b/lib/chronik/store/adapters/ecto/schemas/atom_type.ex @@ -6,11 +6,14 @@ defmodule Chronik.Store.Adapters.Ecto.AtomType do def type, do: :string def cast(nil), do: {:ok, ""} + def cast(value), do: {:ok, value} def load(""), do: {:ok, nil} + def load(value), do: {:ok, String.to_atom(value)} def dump(value) when is_atom(value), do: {:ok, Atom.to_string(value)} + def dump(_), do: :error end diff --git a/lib/chronik/store/adapters/ecto/schemas/blob_type.ex b/lib/chronik/store/adapters/ecto/schemas/blob_type.ex index aaac23c..b579e6b 100644 --- a/lib/chronik/store/adapters/ecto/schemas/blob_type.ex +++ b/lib/chronik/store/adapters/ecto/schemas/blob_type.ex @@ -1,5 +1,6 @@ defmodule Chronik.Store.Adapters.Ecto.Blob do @moduledoc false + @behaviour Ecto.Type alias Chronik.Store.Adapters.Ecto.ChronikRepo @@ -7,13 +8,16 @@ defmodule Chronik.Store.Adapters.Ecto.Blob do def type do case Application.get_env(:Chronik, ChronikRepo)[:adapter] do Ecto.Adapters.Postgres -> - :'bytea' + :bytea + _ -> - :'MEDIUMBLOB' + :MEDIUMBLOB end end def load(blob), do: {:ok, blob} + def dump(blob), do: {:ok, blob} + def cast(binary), do: {:ok, binary} end diff --git a/lib/chronik/store/adapters/ecto/schemas/domain_events.ex b/lib/chronik/store/adapters/ecto/schemas/domain_events.ex index 717c784..b0944c1 100644 --- a/lib/chronik/store/adapters/ecto/schemas/domain_events.ex +++ b/lib/chronik/store/adapters/ecto/schemas/domain_events.ex @@ -4,11 +4,11 @@ defmodule Chronik.Store.Adapters.Ecto.DomainEvents do use Ecto.Schema schema "domain_events" do - field :aggregate_version, :integer - field :domain_event, :binary - field :created, :naive_datetime - field :aggregate_id, :id - field :domain_event_json, :string - field :version, :integer + field(:aggregate_version, :integer) + field(:domain_event, :binary) + field(:created, :naive_datetime) + field(:aggregate_id, :id) + field(:domain_event_json, :string) + field(:version, :integer) end end diff --git a/lib/chronik/store/adapters/ets/ets.ex b/lib/chronik/store/adapters/ets/ets.ex index d935de6..1ab42c6 100644 --- a/lib/chronik/store/adapters/ets/ets.ex +++ b/lib/chronik/store/adapters/ets/ets.ex @@ -16,6 +16,7 @@ defmodule Chronik.Store.Adapters.ETS do @snapshot_table Module.concat(__MODULE__, SnapshotTable) # API + def append(aggregate, events, opts \\ [version: :any]) do GenServer.call(@name, {:append, aggregate, events, opts}) end @@ -37,7 +38,9 @@ defmodule Chronik.Store.Adapters.ETS do end def compare_version(a, a), do: :equal + def compare_version(:empty, "0"), do: :next_one + def compare_version(a, b) when is_bitstring(a) and is_bitstring(b) do case {String.to_integer(a), String.to_integer(b)} do {v1, v2} when v2 == v1 + 1 -> :next_one @@ -46,11 +49,10 @@ defmodule Chronik.Store.Adapters.ETS do end end - def current_version(), do: GenServer.call(@name, :current_version) + def current_version, do: GenServer.call(@name, :current_version) def stream_by_aggregate(aggregate, fun, version \\ :all) do - wrapper_fn = - GenServer.call(@name, {:stream_by_aggregate, aggregate, version}) + wrapper_fn = GenServer.call(@name, {:stream_by_aggregate, aggregate, version}) wrapper_fn.(fun) end @@ -60,6 +62,7 @@ defmodule Chronik.Store.Adapters.ETS do end # GenServer callbacks + def child_spec(_store, opts) do %{ id: __MODULE__, @@ -73,7 +76,6 @@ defmodule Chronik.Store.Adapters.ETS do GenServer.start_link(__MODULE__, [], name: @name) end - def init([]) do # We use two tables. The @table to store the domain events # and @snapshot_table to store the snapshots. @@ -87,17 +89,16 @@ defmodule Chronik.Store.Adapters.ETS do def handle_call(:current_version, _from, state) do {:reply, current_version_local(), state} end - def handle_call({:append, aggregate, events, opts}, _from, state) do + def handle_call({:append, aggregate, events, opts}, _from, state) do aggregate_version = get_aggregate_version(aggregate) version = Keyword.get(opts, :version) - Utils.debug("appending records #{inspect events} from the aggregate #{inspect aggregate}") + Utils.debug("appending records #{inspect(events)} from the aggregate #{inspect(aggregate)}") # Check that the version asked by the client is consistent with the Store. - if (version == :no_stream and aggregate_version == :empty) or - (version == :any) or - (is_bitstring(version) and version == aggregate_version) do - {:reply, do_append(events, aggregate, aggregate_version), state} + if (version == :no_stream and aggregate_version == :empty) or version == :any or + (is_bitstring(version) and version == aggregate_version) do + {:reply, do_append(events, aggregate, aggregate_version), state} else {:reply, {:error, "wrong expected version"}, state} end @@ -114,7 +115,9 @@ defmodule Chronik.Store.Adapters.ETS do fetch_version = case current_records() do - [] -> :empty + [] -> + :empty + records -> records |> List.last() @@ -122,30 +125,33 @@ defmodule Chronik.Store.Adapters.ETS do end records = Enum.drop(current_records(), drop) - Utils.debug("fetched records from #{inspect version}: #{inspect records}.") + Utils.debug("fetched records from #{inspect(version)}: #{inspect(records)}.") {:reply, {:ok, fetch_version, records}, state} end + # Returns the records for a given aggregate starting at version. def handle_call({:fetch_by_aggregate, aggregate, version}, _from, state) do - filter = - fn records -> - case version do - :all -> records - _ -> - records - |> Enum.drop_while(&(&1.aggregate_version != version)) - |> Enum.drop(1) - end + filter = fn records -> + case version do + :all -> + records + + _ -> + records + |> Enum.drop_while(&(&1.aggregate_version != version)) + |> Enum.drop(1) end + end records = current_records() |> Enum.filter(&(&1.aggregate == aggregate)) |> filter.() - Utils.debug("fetched records for aggregate #{inspect aggregate}: #{inspect records}.") + Utils.debug("fetched records for aggregate #{inspect(aggregate)}: #{inspect(records)}.") {:reply, {:ok, get_aggregate_version(aggregate), records}, state} end + def handle_call({:stream, version}, _from, state) do drop = case version do @@ -154,7 +160,7 @@ defmodule Chronik.Store.Adapters.ETS do end records = Enum.drop(current_records(), drop) - Utils.debug("fetched records from #{inspect version}: #{inspect records}.") + Utils.debug("fetched records from #{inspect(version)}: #{inspect(records)}.") ret = fn fun -> records @@ -164,24 +170,26 @@ defmodule Chronik.Store.Adapters.ETS do {:reply, ret, state} end + def handle_call({:stream_by_aggregate, aggregate, version}, _from, state) do - filter = - fn records -> - case version do - :all -> records - _ -> - records - |> Enum.drop_while(&(&1.aggregate_version != version)) - |> Enum.drop(1) - end + filter = fn records -> + case version do + :all -> + records + + _ -> + records + |> Enum.drop_while(&(&1.aggregate_version != version)) + |> Enum.drop(1) end + end records = current_records() |> Enum.filter(&(&1.aggregate == aggregate)) |> filter.() - Utils.debug("fetched records for aggregate #{inspect aggregate}: #{inspect records}.") + Utils.debug("fetched records for aggregate #{inspect(aggregate)}: #{inspect(records)}.") ret = fn fun -> records @@ -191,10 +199,11 @@ defmodule Chronik.Store.Adapters.ETS do {:reply, ret, state} end + # Take a snapshot of the aggregate state and store in the Store. def handle_call({:snapshot, aggregate, aggregate_state, version}, _from, state) do true = :ets.insert(@snapshot_table, {aggregate, {version, aggregate_state}}) - Utils.debug("doing a snapshot for aggregate #{inspect aggregate}") + Utils.debug("doing a snapshot for aggregate #{inspect(aggregate)}") {:reply, :ok, state} end @@ -204,6 +213,7 @@ defmodule Chronik.Store.Adapters.ETS do [] -> Utils.debug("no snapshot found on the store.") {:reply, nil, state} + [{^aggregate, snapshot}] -> Utils.debug("found a snapshot found on the store.") {:reply, snapshot, state} @@ -221,23 +231,24 @@ defmodule Chronik.Store.Adapters.ETS do defp do_append(events, aggregate, aggregate_version) do {new_records, new_version, aggregate_version} = - Enum.reduce(events, {[], current_version_local(), aggregate_version}, - fn (event, {records, version, aggregate_version}) -> - next_agg_version = next_version(aggregate_version) - next_version = next_version(version) - record = - EventRecord.create(event, aggregate, next_version, next_agg_version) - { - records ++ [record], - next_version, - next_agg_version - } - end) - - Utils.debug("appending records: #{inspect new_records}") + Enum.reduce(events, {[], current_version_local(), aggregate_version}, fn event, + {records, version, + aggregate_version} -> + next_agg_version = next_version(aggregate_version) + next_version = next_version(version) + record = EventRecord.create(event, aggregate, next_version, next_agg_version) + + { + records ++ [record], + next_version, + next_agg_version + } + end) + + Utils.debug("appending records: #{inspect(new_records)}") true = :ets.insert(@table, {:version, new_version}) - true = :ets.insert(@table, {:records, current_records() ++ new_records}) + true = :ets.insert(@table, {:records, current_records() ++ new_records}) :ets.lookup(@table, :records) {:ok, aggregate_version, new_records} @@ -262,7 +273,7 @@ defmodule Chronik.Store.Adapters.ETS do end end - defp current_version_local() do + defp current_version_local do case :ets.lookup(@table, :version) do [] -> :empty [{:version, version}] -> version diff --git a/lib/chronik/utils.ex b/lib/chronik/utils.ex index be162e6..127f28f 100644 --- a/lib/chronik/utils.ex +++ b/lib/chronik/utils.ex @@ -1,10 +1,12 @@ defmodule Chronik.Utils do @moduledoc "Utility module for debugging" + if Application.get_env(:chronik, :debug, true) do require Logger + defmacro debug(msg) do quote do - Logger.debug("#{inspect __MODULE__}] #{unquote(msg)}") + Logger.debug("#{inspect(__MODULE__)}] #{unquote(msg)}") end end else @@ -13,7 +15,7 @@ defmodule Chronik.Utils do defmacro warn(msg) do quote do - Logger.warn("#{inspect __MODULE__}] #{unquote(msg)}") + Logger.warn("#{inspect(__MODULE__)}] #{unquote(msg)}") end end end diff --git a/mix.exs b/mix.exs index 48adc68..8abf82f 100644 --- a/mix.exs +++ b/mix.exs @@ -8,7 +8,7 @@ defmodule Chronik.Mixfile do app: :chronik, version: @version, elixir: "~> 1.6", - start_permanent: Mix.env == :prod, + start_permanent: Mix.env() == :prod, test_coverage: [tool: ExCoveralls], preferred_cli_env: [coveralls: :test], docs: docs(), @@ -36,9 +36,10 @@ defmodule Chronik.Mixfile do defp docs do [ - source_ref: "v#{@version}", main: "Chronik", + source_ref: "v#{@version}", + main: "Chronik", canonical: "http://hexdocs.pm/chronik", - source_url: "https://github.com/parody/chronik", + source_url: "https://github.com/parody/chronik" ] end @@ -47,8 +48,7 @@ defmodule Chronik.Mixfile do maintainers: ["Cristian Rosa", "Federico Bergero", "Ricardo Lanziano"], licenses: [], links: %{"GitHub" => "https://github.com/parody/chronik"}, - files: ~w(mix.exs README.md CHANGELOG.md lib priv config example) ++ - ~w(LICENSE TODO.md) + files: ~w(mix.exs README.md CHANGELOG.md lib priv config example) ++ ~w(LICENSE TODO.md) ] end diff --git a/test/chronik/aggregate/aggregate_test.exs b/test/chronik/aggregate/aggregate_test.exs index ba3dc55..71a034c 100644 --- a/test/chronik/aggregate/aggregate_test.exs +++ b/test/chronik/aggregate/aggregate_test.exs @@ -12,12 +12,14 @@ defmodule Chronik.Aggregate.Test do alias Chronik.Aggregate alias Chronik.Aggregate.Test.Counter + alias DomainEvents.{ CounterCreated, CounterIncremented, CounterNamed, CounterMaxUpdated, - CounterDestroyed} + CounterDestroyed + } # The aggregate state is just the counter, name and max, value. defstruct [ @@ -30,15 +32,12 @@ defmodule Chronik.Aggregate.Test do # Public API for the Counter def create(id), do: Aggregate.command(__MODULE__, id, {:create, id}) - def increment(id, increment), - do: Aggregate.command(__MODULE__, id, {:increment, increment}) + def increment(id, increment), do: Aggregate.command(__MODULE__, id, {:increment, increment}) def update_name_and_max(id, name, max), - do: Aggregate.command(__MODULE__, id, - {:update_name_and_max, name, max}) + do: Aggregate.command(__MODULE__, id, {:update_name_and_max, name, max}) - def destroy(id), - do: Aggregate.command(__MODULE__, id, {:destroy}) + def destroy(id), do: Aggregate.command(__MODULE__, id, {:destroy}) # This is only for debugging purposes def state(id), do: Aggregate.state(__MODULE__, id) @@ -47,18 +46,20 @@ defmodule Chronik.Aggregate.Test do def handle_command({:create, id}, nil) do %CounterCreated{id: id, initial_value: 0} end + def handle_command({:create, id}, _state) do - raise "cart #{inspect id} already created" + raise "cart #{inspect(id)} already created" end - def handle_command({:increment, increment}, - %Counter{id: id, max: max, counter: counter}) - when counter + increment < max do + def handle_command({:increment, increment}, %Counter{id: id, max: max, counter: counter}) + when counter + increment < max do %CounterIncremented{id: id, increment: increment} end + def handle_command({:increment}, state) do - raise "cannot increment counter on state #{inspect state}" + raise "cannot increment counter on state #{inspect(state)}" end + # This is an example of a multi-entity command. # It binds the execution of two state changes of two different # entities: name and max @@ -69,18 +70,21 @@ defmodule Chronik.Aggregate.Test do state |> Multi.new(__MODULE__) - |> Multi.delegate(&(&1.name), &rename(&1, id, name)) - |> Multi.delegate(&(&1.max), &update_max(&1, id, max)) + |> Multi.delegate(& &1.name, &rename(&1, id, name)) + |> Multi.delegate(& &1.max, &update_max(&1, id, max)) |> Multi.run() end + def handle_command({:update_name_and_max, _name, _max}, state) do - raise "cannot update_name_and_max counter on state #{inspect state}" + raise "cannot update_name_and_max counter on state #{inspect(state)}" end + def handle_command({:destroy}, %Counter{id: id}) do %CounterDestroyed{id: id} end + def handle_command({:destroy}, state) do - raise "cannot destroy counter on state #{inspect state}" + raise "cannot destroy counter on state #{inspect(state)}" end # This is the state transition function for the Counter. @@ -88,17 +92,23 @@ defmodule Chronik.Aggregate.Test do def handle_event(%CounterCreated{id: id, initial_value: value}, nil) do %Counter{id: id, counter: value, max: 1000} end + # We increment the %Counter{}. - def handle_event(%CounterIncremented{id: id, increment: increment}, - %Counter{id: id, counter: counter}) do + def handle_event(%CounterIncremented{id: id, increment: increment}, %Counter{ + id: id, + counter: counter + }) do %Counter{id: id, counter: counter + increment} end + def handle_event(%CounterNamed{name: name}, state) do put_in(state.name, name) end + def handle_event(%CounterMaxUpdated{max: max}, state) do put_in(state.max, max) end + # When we destroy the counter we go to a invalid state from which # we can not transition out. def handle_event(%CounterDestroyed{}, %Counter{}) do @@ -117,13 +127,14 @@ defmodule Chronik.Aggregate.Test do defp update_max(old_max, id, max) when max > old_max do %CounterMaxUpdated{id: id, max: max} end + defp update_max(old_max, id, max) do raise "cannot reduce the max from #{old_max} to #{max} for counter #{id}" end end test "Double creating an aggregate" do - id = "1" + id = "1" # Check that we can creante an counter aggregate. # This test may failed if there is a snapshot or events in the Store. @@ -170,7 +181,7 @@ defmodule Chronik.Aggregate.Test do end test "Aggregate snapshot and replay of events" do - id = "5" + id = "5" times = 10 value = @increment * (times + 1) @@ -179,8 +190,7 @@ defmodule Chronik.Aggregate.Test do # The aggregate is configured to save a snapshot every 4 events. # So two snapshots should happen here. The last one is only kept in # the Store. - for _ <- 1..times, - do: assert :ok = @aggregate.increment(id, @increment) + for _ <- 1..times, do: assert(:ok = @aggregate.increment(id, @increment)) # Take down the aggregate pid = aggregate_pid({@aggregate, id}) @@ -202,7 +212,7 @@ defmodule Chronik.Aggregate.Test do end test "Shutdown timeout" do - id = "6" + id = "6" assert :ok = @aggregate.create(id) pid = aggregate_pid({@aggregate, id}) @@ -223,12 +233,16 @@ defmodule Chronik.Aggregate.Test do end test "Optmistic concurrency checks" do - id = "7" + id = "7" {store, _pub_sub} = Chronik.Config.fetch_adapters() assert :ok = @aggregate.create(id) - assert {:ok, _version, _records} = store.append({@aggregate, id}, - [%DomainEvents.CounterIncremented{id: id, increment: 3}]) + + assert {:ok, _version, _records} = + store.append({@aggregate, id}, [ + %DomainEvents.CounterIncremented{id: id, increment: 3} + ]) + assert {:error, _} = @aggregate.increment(id, 4) end end diff --git a/test/chronik/projection/projection_test.exs b/test/chronik/projection/projection_test.exs index ca31b7d..5e19d77 100644 --- a/test/chronik/projection/projection_test.exs +++ b/test/chronik/projection/projection_test.exs @@ -26,22 +26,26 @@ defmodule Chronik.Projection.Test do def init(_opts), do: {nil, []} # When the conuter is created the state is the initial value - def handle_event(%EventRecord{domain_event: - %CounterCreated{id: id, initial_value: value}}, nil) do - + def handle_event( + %EventRecord{domain_event: %CounterCreated{id: id, initial_value: value}}, + nil + ) do %{id => value} end - def handle_event(%EventRecord{domain_event: - %CounterCreated{id: id, initial_value: value}}, state) do - Map.put(state,id, value) + def handle_event( + %EventRecord{domain_event: %CounterCreated{id: id, initial_value: value}}, + state + ) do + Map.put(state, id, value) end # After an increment we transition to the sum # of the state and the increment. - def handle_event(%EventRecord{domain_event: - %CounterIncremented{id: id, increment: value}}, state) do - + def handle_event( + %EventRecord{domain_event: %CounterIncremented{id: id, increment: value}}, + state + ) do Map.update!(state, id, &(&1 + value)) end @@ -61,19 +65,16 @@ defmodule Chronik.Projection.Test do # FIXME: Wait a while for the projection to transition to the next state. defp wait, do: Process.sleep(100) - test "Normal flow of a projection", - %{projection: projection, store: store, pub_sub: pub_sub} do - - aggregate = {@aggregate, "3"} + test "Normal flow of a projection", %{projection: projection, store: store, pub_sub: pub_sub} do + aggregate = {@aggregate, "3"} id = "10" - initial_value = 0 + initial_value = 0 increment_value = 3 - create_event = %CounterCreated{id: id, initial_value: initial_value} + create_event = %CounterCreated{id: id, initial_value: initial_value} increment_event = %CounterIncremented{id: id, increment: increment_value} # The first event is on the Store before the Projection is starts. - assert {:ok, version, _} = - store.append(aggregate, [create_event], [version: :no_stream]) + assert {:ok, version, _} = store.append(aggregate, [create_event], version: :no_stream) # We start the projection and start listening on the aggregate stream. # In this case an event is already recorded on the Store. @@ -86,8 +87,7 @@ defmodule Chronik.Projection.Test do assert initial_value == projection.state()[id] # Store a increment domain event on the Store. - {:ok, version, records} = - store.append(aggregate, [increment_event], [version: version]) + {:ok, version, records} = store.append(aggregate, [increment_event], version: version) # and broadcast the new record to the PubSub. pub_sub.broadcast(records) wait() @@ -95,8 +95,7 @@ defmodule Chronik.Projection.Test do assert ^increment_value = projection.state()[id] # Store a increment domain event on the Store. - {:ok, _version, _records} = - store.append(aggregate, [increment_event], [version: version]) + {:ok, _version, _records} = store.append(aggregate, [increment_event], version: version) GenServer.stop(pid) diff --git a/test/chronik/pub_sub/pub_sub_test.exs b/test/chronik/pub_sub/pub_sub_test.exs index 61efb25..533fff2 100644 --- a/test/chronik/pub_sub/pub_sub_test.exs +++ b/test/chronik/pub_sub/pub_sub_test.exs @@ -15,10 +15,10 @@ defmodule Chronik.PubSub.Test do test "subscribe, broadcast and receive events", %{pub_sub: pub_sub} do # Check that we can subscribe to the PubSub - assert_ok pub_sub.subscribe() + assert_ok(pub_sub.subscribe()) # Check that we can broadcast to the PubSub - assert_ok pub_sub.broadcast([:event1, :event2, :event3]) + assert_ok(pub_sub.broadcast([:event1, :event2, :event3])) # Check that events are received (in order) assert_receive :event1 @@ -31,7 +31,7 @@ defmodule Chronik.PubSub.Test do pub_sub.broadcast([:event1]) # Check that we can unsubscribe from the PubSub - assert_ok pub_sub.unsubscribe() + assert_ok(pub_sub.unsubscribe()) # :event2 is broadcasted while we are unsubscried from the PubSub pub_sub.broadcast([:event2]) diff --git a/test/chronik/store/store_test.exs b/test/chronik/store/store_test.exs index da97446..4cc5c9f 100644 --- a/test/chronik/store/store_test.exs +++ b/test/chronik/store/store_test.exs @@ -6,12 +6,12 @@ defmodule Chronik.Store.Test do use Chronik.Store, otp_app: :chronik def child_spec(opts) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]}, - type: :worker - } - end + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker + } + end defoverridable child_spec: 1 end @@ -29,50 +29,63 @@ defmodule Chronik.Store.Test do assert :empty == TestStore.current_version() # Test that events can be appended to the Store - assert {:ok, version, [_, _]} = TestStore.append(aggregate, - [%CounterCreated{id: "11", initial_value: 0}, - %CounterIncremented{id: "11", increment: 3}], version: :any) + assert {:ok, version, [_, _]} = + TestStore.append( + aggregate, + [ + %CounterCreated{id: "11", initial_value: 0}, + %CounterIncremented{id: "11", increment: 3} + ], + version: :any + ) assert :empty != TestStore.current_version() # If the stream exists and appending with version: :no_stream an error # should occurr assert {:error, "wrong expected version"} = - TestStore.append(aggregate, - [%CounterCreated{id: "11", initial_value: 0}], version: :no_stream) + TestStore.append( + aggregate, + [%CounterCreated{id: "11", initial_value: 0}], + version: :no_stream + ) # Check that the Store is on version 1 (since two events were appended) - assert {:ok, _new_version, _} = TestStore.append(aggregate, - [%CounterIncremented{id: "11", increment: 3}], version: version) + assert {:ok, _new_version, _} = + TestStore.append( + aggregate, + [%CounterIncremented{id: "11", increment: 3}], + version: version + ) # Check now that the Store version is not 1 anymore assert {:error, "wrong expected version"} = - TestStore.append(aggregate, [nil], version: version) + TestStore.append(aggregate, [nil], version: version) aggregate = {:test_aggregate, "2"} - events = - [%CounterCreated{id: "3", initial_value: 0}, - %CounterIncremented{id: "3", increment: 3}, - %CounterIncremented{id: "3", increment: 3}] + + events = [ + %CounterCreated{id: "3", initial_value: 0}, + %CounterIncremented{id: "3", increment: 3}, + %CounterIncremented{id: "3", increment: 3} + ] # Append three events and remember the current_version of the Store {:ok, last_version, _} = TestStore.append(aggregate, events, version: :any) # Check that nothing new is returnd from the last_version - assert {:ok, ^last_version, []} = - TestStore.fetch_by_aggregate(aggregate, last_version) + assert {:ok, ^last_version, []} = TestStore.fetch_by_aggregate(aggregate, last_version) # Check that the last event is returned if we fetch from version - assert {:ok, _version, [%{domain_event: - %CounterIncremented{id: "3", increment: 3}}]} = - TestStore.fetch_by_aggregate(aggregate, version) + assert {:ok, _version, [%{domain_event: %CounterIncremented{id: "3", increment: 3}}]} = + TestStore.fetch_by_aggregate(aggregate, version) # Fecth all stored records and keep the data field data_list = aggregate |> TestStore.fetch_by_aggregate() |> elem(2) - |> Enum.map(&(&1.domain_event)) + |> Enum.map(& &1.domain_event) # Check that we got all events assert events = data_list @@ -82,7 +95,7 @@ defmodule Chronik.Store.Test do aggregate |> TestStore.fetch_by_aggregate(version) |> elem(2) - |> Enum.map(&(&1.domain_event)) + |> Enum.map(& &1.domain_event) # Check that the second (included) and all the rests were fetched assert [List.last(events)] == data_list From b57fb772ef77de9e577697b08e3621fa91edee57 Mon Sep 17 00:00:00 2001 From: arpunk Date: Mon, 9 Apr 2018 08:44:41 -0500 Subject: [PATCH 03/13] Fix state comparison clause --- lib/chronik/aggregate.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/chronik/aggregate.ex b/lib/chronik/aggregate.ex index b83a6b5..2721c8e 100644 --- a/lib/chronik/aggregate.ex +++ b/lib/chronik/aggregate.ex @@ -232,7 +232,7 @@ defmodule Chronik.Aggregate do store_and_publish(new_events, new_state, state) rescue e -> - if state do + if as do {:reply, {:error, e}, state} else {:stop, :normal, {:error, e}, state} From 30c76789c6cb72f4e52f822f9d4e14fcb89c69d3 Mon Sep 17 00:00:00 2001 From: Federico Bergero Date: Tue, 10 Apr 2018 15:18:03 -0300 Subject: [PATCH 04/13] Adding test for failing initial command on aggregate. --- test/chronik/aggregate/aggregate_test.exs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/test/chronik/aggregate/aggregate_test.exs b/test/chronik/aggregate/aggregate_test.exs index 71a034c..80a5d7a 100644 --- a/test/chronik/aggregate/aggregate_test.exs +++ b/test/chronik/aggregate/aggregate_test.exs @@ -43,6 +43,11 @@ defmodule Chronik.Aggregate.Test do def state(id), do: Aggregate.state(__MODULE__, id) # Counter command handlers + def handle_command({:create, nil}, nil) do + # Return the PID to check that the GenServer is down. + raise "#{:erlang.pid_to_list self()}" + end + def handle_command({:create, id}, nil) do %CounterCreated{id: id, initial_value: 0} end @@ -52,7 +57,7 @@ defmodule Chronik.Aggregate.Test do end def handle_command({:increment, increment}, %Counter{id: id, max: max, counter: counter}) - when counter + increment < max do + when is_integer(increment) and counter + increment < max do %CounterIncremented{id: id, increment: increment} end @@ -245,4 +250,20 @@ defmodule Chronik.Aggregate.Test do assert {:error, _} = @aggregate.increment(id, 4) end + + test "Stop on fail initial command" do + {:error, %RuntimeError{message: pid_st}} = @aggregate.create(nil) + pid = pid_st |> String.to_charlist() |> :erlang.list_to_pid() + assert false == Process.alive?(pid) + end + + test "Fail command on initialized aggregate does not stops the GenServer" do + id = "8" + :ok = @aggregate.create(id) + pid = aggregate_pid({@aggregate, id}) + assert true == Process.alive?(pid) + # Calling increment with a non-integer fails, but keeps the aggreate alive. + assert {:error, _} = @aggregate.increment(id, nil) + assert true == Process.alive?(pid) + end end From d202262542d0d480a91000e05a0f8fa081d826fd Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 4 May 2018 14:06:51 -0500 Subject: [PATCH 05/13] Use Jason by default for JSON encoding/decoding. Closes #37 --- config/config.exs | 2 ++ lib/chronik/store/adapters/ecto/ecto.ex | 2 +- mix.exs | 2 +- mix.lock | 1 + 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/config/config.exs b/config/config.exs index c9c59bb..d0cb4c8 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,5 @@ use Mix.Config +config :ecto, json_library: Jason + import_config "#{Mix.env}.exs" diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 7e395f4..e988f19 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -307,7 +307,7 @@ defmodule Chronik.Store.Adapters.Ecto do json = record.domain_event.__struct__ |> Atom.to_string() - |> Kernel.<>(Poison.encode!(record.domain_event)) + |> Kernel.<>(Jason.encode!(record.domain_event)) %{ aggregate_id: aggregate_id, diff --git a/mix.exs b/mix.exs index 48adc68..2e1cf6b 100644 --- a/mix.exs +++ b/mix.exs @@ -63,7 +63,7 @@ defmodule Chronik.Mixfile do # For Ecto-MySQL store {:ecto, "~> 2.1"}, {:mariaex, "~> 0.8.2"}, - {:poison, "~> 3.1.0"}, + {:jason, "~> 1.0.0"}, {:confex, "~> 3.2.3"} ] end diff --git a/mix.lock b/mix.lock index 4001953..e735305 100644 --- a/mix.lock +++ b/mix.lock @@ -15,6 +15,7 @@ "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, "hackney": {:hex, :hackney, "1.12.1", "8bf2d0e11e722e533903fe126e14d6e7e94d9b7983ced595b75f532e04b7fdc7", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"}, "mariaex": {:hex, :mariaex, "0.8.4", "5dd42a600c3949ec020cfac162a815115c9e9e406abffc1b14ffdc611d6f84bc", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, From 0f7f7a08befdcaa64f58298b897edac1b1a64c0a Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 4 May 2018 14:26:01 -0500 Subject: [PATCH 06/13] Compress events and state snapshot --- lib/chronik/store/adapters/ecto/ecto.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 7e395f4..bed0c08 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -178,7 +178,7 @@ defmodule Chronik.Store.Adapters.Ecto do |> where(aggregate_id: ^id) |> Repo.update_all( set: [snapshot_version: version, - snapshot: :erlang.term_to_binary(aggregate_state)]) + snapshot: :erlang.term_to_binary(aggregate_state, compressed: 1)]) {:reply, :ok, state} end @@ -311,7 +311,7 @@ defmodule Chronik.Store.Adapters.Ecto do %{ aggregate_id: aggregate_id, - domain_event: :erlang.term_to_binary(record.domain_event), + domain_event: :erlang.term_to_binary(record.domain_event, compressed: 1), domain_event_json: json, aggregate_version: String.to_integer(record.aggregate_version), version: String.to_integer(record.version), From c891ed71780a00915144400e3dfb686ed46d9b3a Mon Sep 17 00:00:00 2001 From: arpunk Date: Wed, 9 May 2018 06:37:34 -0500 Subject: [PATCH 07/13] Make compression configurable at compile-time --- config/dev.exs | 3 +++ config/test.exs | 1 - lib/chronik/store/adapters/ecto/ecto.ex | 22 +++++++++++++++++++--- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/config/dev.exs b/config/dev.exs index 4161002..fa24fdb 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -10,3 +10,6 @@ config :chronik, Chronik.Store.Adapters.Ecto.ChronikRepo, adapter: Ecto.Adapters.MySQL, url: {:system, "CHRONIK_REPO_URL", "ecto://root:root@localhost/chronik"} +config :chronik, Chronik.Store.Adapters.Ecto, + aggregate_snapshot_compression_level: 1, + domain_event_compression_level: 9 diff --git a/config/test.exs b/config/test.exs index 92b54cb..b4c8752 100644 --- a/config/test.exs +++ b/config/test.exs @@ -14,4 +14,3 @@ config :chronik, Chronik.Aggregate.Test.Counter, config :chronik, Chronik.Store.Test.TestStore, adapter: Chronik.Store.Adapters.ETS - diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index bed0c08..f731a36 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -1,5 +1,18 @@ defmodule Chronik.Store.Adapters.Ecto do - @moduledoc false + @moduledoc """ + Ecto adapter for `Chronik.Store` + + ## Configuration + + You can configure compression for the aggregate state and domain + events only for this adapter. By default both values are at 0 + (compression disabled). + + - `:aggregate_compression_level` + - `:domain_event_compression_level` + + Both accept an integer from 0 to 9, being 9 the highest compression. + """ @behaviour Chronik.Store @@ -17,6 +30,9 @@ defmodule Chronik.Store.Adapters.Ecto do @name __MODULE__ @epoch :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) + @aggregate_compression Application.get_env(:chronik, __MODULE__)[:aggregate_compression_level] || 0 + @domain_event_compression Application.get_env(:chronik, __MODULE__)[:domain_event_compression_level] || 0 + # API def append(aggregate, events, opts \\ [version: :any]) do @@ -178,7 +194,7 @@ defmodule Chronik.Store.Adapters.Ecto do |> where(aggregate_id: ^id) |> Repo.update_all( set: [snapshot_version: version, - snapshot: :erlang.term_to_binary(aggregate_state, compressed: 1)]) + snapshot: :erlang.term_to_binary(aggregate_state, compressed: @aggregate_compression)]) {:reply, :ok, state} end @@ -311,7 +327,7 @@ defmodule Chronik.Store.Adapters.Ecto do %{ aggregate_id: aggregate_id, - domain_event: :erlang.term_to_binary(record.domain_event, compressed: 1), + domain_event: :erlang.term_to_binary(record.domain_event, compressed: @domain_event_compression), domain_event_json: json, aggregate_version: String.to_integer(record.aggregate_version), version: String.to_integer(record.version), From 97dc4dc65900e8a20a242cc9addb8ae56477ccc6 Mon Sep 17 00:00:00 2001 From: arpunk Date: Wed, 9 May 2018 08:04:09 -0500 Subject: [PATCH 08/13] We only compress the aggregate state snapshot --- lib/chronik/store/adapters/ecto/ecto.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 76f15be..8451593 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -4,7 +4,7 @@ defmodule Chronik.Store.Adapters.Ecto do ## Configuration - You can configure compression for the aggregate state and domain + You can configure compression for the aggregate snapshot and domain events only for this adapter. By default both values are at 0 (compression disabled). From c6f30f5fa31f7e3e2e469d0cdc35b20c6d55e449 Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 4 May 2018 14:06:51 -0500 Subject: [PATCH 09/13] Use Jason by default for JSON encoding/decoding. Closes #37 --- config/config.exs | 4 +++- lib/chronik/store/adapters/ecto/ecto.ex | 2 +- mix.exs | 2 +- mix.lock | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/config/config.exs b/config/config.exs index 8233fe9..d0cb4c8 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,5 @@ use Mix.Config -import_config "#{Mix.env()}.exs" +config :ecto, json_library: Jason + +import_config "#{Mix.env}.exs" diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 8451593..e3291c3 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -360,7 +360,7 @@ defmodule Chronik.Store.Adapters.Ecto do json = record.domain_event.__struct__ |> Atom.to_string() - |> Kernel.<>(Poison.encode!(record.domain_event)) + |> Kernel.<>(Jason.encode!(record.domain_event)) %{ aggregate_id: aggregate_id, diff --git a/mix.exs b/mix.exs index 8abf82f..5c5b56d 100644 --- a/mix.exs +++ b/mix.exs @@ -63,7 +63,7 @@ defmodule Chronik.Mixfile do # For Ecto-MySQL store {:ecto, "~> 2.1"}, {:mariaex, "~> 0.8.2"}, - {:poison, "~> 3.1.0"}, + {:jason, "~> 1.0.0"}, {:confex, "~> 3.2.3"} ] end diff --git a/mix.lock b/mix.lock index 4001953..e735305 100644 --- a/mix.lock +++ b/mix.lock @@ -15,6 +15,7 @@ "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, "hackney": {:hex, :hackney, "1.12.1", "8bf2d0e11e722e533903fe126e14d6e7e94d9b7983ced595b75f532e04b7fdc7", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"}, "mariaex": {:hex, :mariaex, "0.8.4", "5dd42a600c3949ec020cfac162a815115c9e9e406abffc1b14ffdc611d6f84bc", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, From cba9cc79c5b8318cd9660c056371afca6cfb1843 Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 11 May 2018 08:50:19 -0500 Subject: [PATCH 10/13] Allow for custom JSON encoder/decoder --- config/dev.exs | 4 +++- lib/chronik/store/adapters/ecto/ecto.ex | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/config/dev.exs b/config/dev.exs index fa24fdb..68f01b6 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -12,4 +12,6 @@ config :chronik, Chronik.Store.Adapters.Ecto.ChronikRepo, config :chronik, Chronik.Store.Adapters.Ecto, aggregate_snapshot_compression_level: 1, - domain_event_compression_level: 9 + domain_event_compression_level: 9, + json_library: Jason, + dump_to_json: false diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index e3291c3..25f4f9e 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -33,6 +33,8 @@ defmodule Chronik.Store.Adapters.Ecto do @aggregate_compression Application.get_env(:chronik, __MODULE__)[:aggregate_compression_level] || 0 @domain_event_compression Application.get_env(:chronik, __MODULE__)[:domain_event_compression_level] || 0 + @dump_json Application.get_env(:chronik, __MODULE__)[:dump_to_json] || false + # API def append(aggregate, events, opts \\ [version: :any]) do @@ -101,6 +103,11 @@ defmodule Chronik.Store.Adapters.Ecto do # GenServer callbacks def init(opts) do + json_library = + Application.get_env(:chronik, Chronik.Store.Adapters.Ecto)[:json_library] || Jason + + Application.put_env(:ecto, :json_library, json_library) + Repo.start_link(opts) end @@ -358,9 +365,13 @@ defmodule Chronik.Store.Adapters.Ecto do defp insert_event(record, aggregate_id) do json = - record.domain_event.__struct__ - |> Atom.to_string() - |> Kernel.<>(Jason.encode!(record.domain_event)) + if @dump_json do + record.domain_event.__struct__ + |> Atom.to_string() + |> Kernel.<>(Jason.encode!(record.domain_event)) + else + nil + end %{ aggregate_id: aggregate_id, From 014b0f4cc11dee6fee83a352fd18bb6d4a41b1f6 Mon Sep 17 00:00:00 2001 From: arpunk Date: Fri, 11 May 2018 09:00:46 -0500 Subject: [PATCH 11/13] Remove Ecto JSON library config --- config/config.exs | 2 -- 1 file changed, 2 deletions(-) diff --git a/config/config.exs b/config/config.exs index d0cb4c8..c9c59bb 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,5 +1,3 @@ use Mix.Config -config :ecto, json_library: Jason - import_config "#{Mix.env}.exs" From 4033b3cf86c4d2331c3e82d902225d2df3160440 Mon Sep 17 00:00:00 2001 From: Ricardo Lanziano Date: Wed, 16 May 2018 09:42:44 -0500 Subject: [PATCH 12/13] Remove events from the store (#42) Remove events from the store --- lib/chronik/store.ex | 4 ++++ lib/chronik/store/adapters/ecto/ecto.ex | 28 +++++++++++++++++++---- lib/chronik/store/adapters/ets/ets.ex | 15 ++++++++++++ mix.lock | 2 +- test/chronik/aggregate/aggregate_test.exs | 27 ++++++++++++++-------- test/chronik/store/store_test.exs | 14 +++++++++++- 6 files changed, 74 insertions(+), 16 deletions(-) diff --git a/lib/chronik/store.ex b/lib/chronik/store.ex index a412430..843dac8 100644 --- a/lib/chronik/store.ex +++ b/lib/chronik/store.ex @@ -109,6 +109,9 @@ defmodule Chronik.Store do :ok | {:error, reason() :: String.t()} + @doc "Remove all events for given `aggregate`" + @callback remove_events(aggregate :: Chronik.Aggregate) :: :ok + @doc """ Retrives a snapshot from the Store. If there is no snapshot it returns `nil`. @@ -157,6 +160,7 @@ defmodule Chronik.Store do defdelegate get_snapshot(aggregate), to: @adapter defdelegate fetch(version \\ :all), to: @adapter defdelegate fetch_by_aggregate(aggregate, version \\ :all), to: @adapter + defdelegate remove_events(aggregate), to: @adapter defdelegate stream(fun, version \\ :all), to: @adapter defdelegate stream_by_aggregate(aggregate, fun, version \\ :all), to: @adapter defdelegate compare_version(version1, version2), to: @adapter diff --git a/lib/chronik/store/adapters/ecto/ecto.ex b/lib/chronik/store/adapters/ecto/ecto.ex index 25f4f9e..c041f6d 100644 --- a/lib/chronik/store/adapters/ecto/ecto.ex +++ b/lib/chronik/store/adapters/ecto/ecto.ex @@ -38,15 +38,19 @@ defmodule Chronik.Store.Adapters.Ecto do # API def append(aggregate, events, opts \\ [version: :any]) do - GenServer.call(__MODULE__, {:append, aggregate, events, opts}) + GenServer.call(@name, {:append, aggregate, events, opts}) end def snapshot(aggregate, state, version) do - GenServer.call(__MODULE__, {:snapshot, aggregate, state, version}) + GenServer.call(@name, {:snapshot, aggregate, state, version}) end def get_snapshot(aggregate) do - GenServer.call(__MODULE__, {:get_snapshot, aggregate}) + GenServer.call(@name, {:get_snapshot, aggregate}) + end + + def remove_events(aggregate) do + GenServer.call(@name, {:remove_events, aggregate}) end def fetch_by_aggregate(aggregate, version \\ :all) do @@ -128,6 +132,22 @@ defmodule Chronik.Store.Adapters.Ecto do end end + def handle_call({:remove_events, {aggregate_module, aggregate_id}}, _from, state) do + from(e in DomainEvents, + join: a in Aggregate, + where: a.id == e.aggregate_id, + where: a.aggregate == ^aggregate_module, + where: a.aggregate_id == ^aggregate_id + ) |> Repo.delete_all() + + from(a in Aggregate, + where: a.aggregate_id == ^aggregate_id, + where: a.aggregate == ^aggregate_module + ) |> Repo.delete_all() + + {:reply, :ok, state} + end + # Get the events for a given aggregate starting at version. def handle_call( {:fetch_by_aggregate, {aggregate_module, aggregate_id} = aggregate, version}, @@ -444,7 +464,7 @@ defmodule Chronik.Store.Adapters.Ecto do end) end - def domain_event(event, aggregate) do + defp domain_event(event, aggregate) do :erlang.binary_to_term(event) rescue _ -> diff --git a/lib/chronik/store/adapters/ets/ets.ex b/lib/chronik/store/adapters/ets/ets.ex index 1ab42c6..08d379d 100644 --- a/lib/chronik/store/adapters/ets/ets.ex +++ b/lib/chronik/store/adapters/ets/ets.ex @@ -33,6 +33,10 @@ defmodule Chronik.Store.Adapters.ETS do GenServer.call(@name, {:get_snapshot, aggregate}) end + def remove_events(aggregate) do + GenServer.call(@name, {:remove_events, aggregate}) + end + def fetch_by_aggregate(aggregate, version \\ :all) do GenServer.call(@name, {:fetch_by_aggregate, aggregate, version}) end @@ -86,6 +90,17 @@ defmodule Chronik.Store.Adapters.ETS do _ -> {:stop, {:error, "event store already started"}} end + def handle_call({:remove_events, aggregate}, _from, state) do + events = + current_records() + |> Enum.drop_while(&(&1.aggregate == aggregate)) + + true = :ets.insert(@table, {:records, events}) + true = :ets.delete(@snapshot_table, aggregate) + + {:reply, :ok, state} + end + def handle_call(:current_version, _from, state) do {:reply, current_version_local(), state} end diff --git a/mix.lock b/mix.lock index e735305..44dd89e 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "decimal": {:hex, :decimal, "1.5.0", "b0433a36d0e2430e3d50291b1c65f53c37d56f83665b43d79963684865beab68", [:mix], [], "hexpm"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"}, - "ecto": {:hex, :ecto, "2.2.9", "031d55df9bb430cb118e6f3026a87408d9ce9638737bda3871e5d727a3594aae", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"}, + "ecto": {:hex, :ecto, "2.2.10", "e7366dc82f48f8dd78fcbf3ab50985ceeb11cb3dc93435147c6e13f2cda0992e", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"}, "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, "exconstructor": {:hex, :exconstructor, "1.1.0", "272623a7b203cb2901c20cbb92c5c3ab103cc0087ff7c881979e046043346752", [], [], "hexpm"}, "excoveralls": {:hex, :excoveralls, "0.8.1", "0bbf67f22c7dbf7503981d21a5eef5db8bbc3cb86e70d3798e8c802c74fa5e27", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/chronik/aggregate/aggregate_test.exs b/test/chronik/aggregate/aggregate_test.exs index 80a5d7a..689f112 100644 --- a/test/chronik/aggregate/aggregate_test.exs +++ b/test/chronik/aggregate/aggregate_test.exs @@ -30,7 +30,8 @@ defmodule Chronik.Aggregate.Test do ] # Public API for the Counter - def create(id), do: Aggregate.command(__MODULE__, id, {:create, id}) + def create(id), + do: Aggregate.command(__MODULE__, id, {:create, id}) def increment(id, increment), do: Aggregate.command(__MODULE__, id, {:increment, increment}) @@ -57,7 +58,7 @@ defmodule Chronik.Aggregate.Test do end def handle_command({:increment, increment}, %Counter{id: id, max: max, counter: counter}) - when is_integer(increment) and counter + increment < max do + when counter + increment < max do %CounterIncremented{id: id, increment: increment} end @@ -123,6 +124,7 @@ defmodule Chronik.Aggregate.Test do ## ## Internal function ## + # This is a command validator on the name entity. defp rename(_name_state, id, name) do %CounterNamed{id: id, name: name} @@ -166,6 +168,7 @@ defmodule Chronik.Aggregate.Test do test "Multiple-entity command using Aggregate.Multi" do id = "3" + @aggregate.create(id) # This is a composed command to test the |> operator on executes @@ -229,14 +232,6 @@ defmodule Chronik.Aggregate.Test do assert false == Process.alive?(pid) end - # Get the pid for a given aggregate from the Regsitry. - defp aggregate_pid(aggregate) do - Chronik.AggregateRegistry - |> Registry.lookup(aggregate) - |> hd() - |> elem(0) - end - test "Optmistic concurrency checks" do id = "7" {store, _pub_sub} = Chronik.Config.fetch_adapters() @@ -266,4 +261,16 @@ defmodule Chronik.Aggregate.Test do assert {:error, _} = @aggregate.increment(id, nil) assert true == Process.alive?(pid) end + + # + # Internal functions + # + + # Get the pid for a given aggregate from the Regsitry. + defp aggregate_pid(aggregate) do + Chronik.AggregateRegistry + |> Registry.lookup(aggregate) + |> hd() + |> elem(0) + end end diff --git a/test/chronik/store/store_test.exs b/test/chronik/store/store_test.exs index 4cc5c9f..3fc4f4d 100644 --- a/test/chronik/store/store_test.exs +++ b/test/chronik/store/store_test.exs @@ -80,7 +80,7 @@ defmodule Chronik.Store.Test do assert {:ok, _version, [%{domain_event: %CounterIncremented{id: "3", increment: 3}}]} = TestStore.fetch_by_aggregate(aggregate, version) - # Fecth all stored records and keep the data field + # Fetch all stored records and keep the data field data_list = aggregate |> TestStore.fetch_by_aggregate() @@ -113,5 +113,17 @@ defmodule Chronik.Store.Test do end assert 0 < TestStore.stream(f) + + # Event removal + + assert :ok = TestStore.remove_events({:test_aggregate, "1"}) + assert {:ok, :empty, []} = TestStore.fetch_by_aggregate({:test_aggregate, "1"}) + assert nil == TestStore.get_snapshot({:test_aggregate, "1"}) + + assert {:ok, "2", _events} = TestStore.fetch_by_aggregate({:test_aggregate, "2"}) + + assert :empty != TestStore.current_version() + + assert {:ok, "5", []} = TestStore.fetch("1000") end end From 1d89ff18c277952f52d87f2f84154bfd873f8f97 Mon Sep 17 00:00:00 2001 From: Federico Bergero Date: Wed, 16 May 2018 11:44:49 -0300 Subject: [PATCH 13/13] Bumping version to 0.1.10 --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 5c5b56d..473157e 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Chronik.Mixfile do use Mix.Project - @version "0.1.9" + @version "0.1.10" def project do [