Skip to content

Commit

Permalink
Merge pull request #43 from parody/develop
Browse files Browse the repository at this point in the history
Merge develop branch to master
  • Loading branch information
fbergero authored May 16, 2018
2 parents 1330666 + 1d89ff1 commit e6fd1a2
Show file tree
Hide file tree
Showing 31 changed files with 743 additions and 473 deletions.
3 changes: 3 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
5 changes: 5 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ config :chronik, Chronik.Store.Adapters.Ecto.ChronikRepo,
adapter: Ecto.Adapters.MySQL,
url: {:system, "CHRONIK_REPO_URL", "ecto://root:root@localhost/chronik"}

config :chronik, Chronik.Store.Adapters.Ecto,
aggregate_snapshot_compression_level: 1,
domain_event_compression_level: 9,
json_library: Jason,
dump_to_json: false
1 change: 0 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ config :chronik, Chronik.Aggregate.Test.Counter,

config :chronik, Chronik.Store.Test.TestStore,
adapter: Chronik.Store.Adapters.ETS

2 changes: 2 additions & 0 deletions lib/chronik.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ defmodule Chronik do
order. Missing events are fetch from `Chronik.Store`.
Debugging can be turned off by placing
```
config :chronik, :debug, false
```
in a config script.
"""

Expand Down
161 changes: 96 additions & 65 deletions lib/chronik/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ defmodule Chronik.Aggregate do
given state, the function should return a list (or a single) of domain events.
If the command is invalid the `handle_command` should raise an exception.
"""
@callback handle_command(cmd :: Chronik.command(),
state :: state()) :: [Chronik.domain_event()] | no_return()
@callback handle_command(cmd :: Chronik.command(), state :: state()) ::
[Chronik.domain_event()] | no_return()

@doc """
The `handle_event` is the transition function for the aggregate. After
Expand All @@ -125,15 +125,17 @@ defmodule Chronik.Aggregate do
alias Chronik.Aggregate.Supervisor
alias Chronik.{AggregateRegistry, Config, Utils}

defstruct [:id,
:num_events,
:blocks,
:store,
:pub_sub ,
:module,
:aggregate_version,
:aggregate_state,
:timer]
defstruct [
:id,
:num_events,
:blocks,
:store,
:pub_sub,
:module,
:aggregate_version,
:aggregate_state,
:timer
]

# API

Expand All @@ -145,22 +147,29 @@ defmodule Chronik.Aggregate do
The results is either `:ok` or `{:error, reason}` in case of failure.
"""
@spec command(module :: module(),
id :: Chronik.id(),
cmd :: term(),
timeout :: :infinity | non_neg_integer()) :: :ok | {:error, String.t}
@spec command(
module :: module(),
id :: Chronik.id(),
cmd :: term(),
timeout :: :infinity | non_neg_integer()
) :: :ok | {:error, String.t()}
def command(module, id, cmd, timeout \\ 5000) do
Utils.debug("#{inspect id}: executing command #{inspect cmd}")
Utils.debug("#{inspect(id)}: executing command #{inspect(cmd)}")

case Registry.lookup(AggregateRegistry, {module, id}) do
[] ->
case Supervisor.start_aggregate(module, id) do
{:ok, pid} ->
GenServer.call(pid, {module, cmd}, timeout)

{:error, reason} ->
{:error, "cannot create process for aggregate root " <>
"{#{module}, #{inspect id}}: #{inspect reason}"}
{:error,
"cannot create process for aggregate root " <>
"{#{module}, #{inspect(id)}}: #{inspect(reason)}"}
end
[{pid, _metadata}] -> GenServer.call(pid, {module, cmd}, timeout)

[{pid, _metadata}] ->
GenServer.call(pid, {module, cmd}, timeout)
end
end

Expand All @@ -175,9 +184,9 @@ defmodule Chronik.Aggregate do
@doc """
Start a `Chronik.Aggregate` with callbacks on `module` with id `id`.
"""
@spec start_link(module :: module(),
id :: Chronik.id()) :: {:ok, pid()}
| {:error, reason :: String.t}
@spec start_link(module :: module(), id :: Chronik.id()) ::
{:ok, pid()}
| {:error, reason :: String.t()}
def start_link(module, id) do
GenServer.start_link(__MODULE__, {module, id}, name: via(module, id))
end
Expand All @@ -187,24 +196,28 @@ defmodule Chronik.Aggregate do
def init({module, id}) do
# Fetch the configuration for the Store and the PubSub.
{store, pub_sub} = Config.fetch_adapters()
Utils.debug("#{inspect id}: starting aggregate.")
Utils.debug("#{inspect(id)}: starting aggregate.")
{:ok, version, aggregate_state} = load_from_store(module, id, store)

{:ok, %__MODULE__{id: id,
aggregate_state: aggregate_state,
aggregate_version: version,
timer: update_timer(nil, module),
num_events: 0,
blocks: 0,
store: store,
pub_sub: pub_sub,
module: module}}
{:ok,
%__MODULE__{
id: id,
aggregate_state: aggregate_state,
aggregate_version: version,
timer: update_timer(nil, module),
num_events: 0,
blocks: 0,
store: store,
pub_sub: pub_sub,
module: module
}}
end

# The :state returns the current aggregate state.
def handle_call(:state, _from, %__MODULE__{aggregate_state: as} = state) do
{:reply, as, state}
end

# When called with a function, the aggregate executes the function in
# the current state and if no exceptions were raised, it stores and
# publishes the events to the PubSub.
Expand All @@ -214,12 +227,12 @@ defmodule Chronik.Aggregate do
|> module.handle_command(as)
|> List.wrap()

Utils.debug("#{inspect state.id}: newly created events: #{inspect new_events}")
Utils.debug("#{inspect(state.id)}: newly created events: #{inspect(new_events)}")
new_state = Enum.reduce(new_events, as, &module.handle_event/2)
store_and_publish(new_events, new_state, state)
rescue
e ->
if state do
if as do
{:reply, {:error, e}, state}
else
{:stop, :normal, {:error, e}, state}
Expand All @@ -231,7 +244,7 @@ defmodule Chronik.Aggregate do
# :shutdown to the current process.
def handle_info(:shutdown, %__MODULE__{id: id} = state) do
# TODO: Do a snapshot before going down.
Utils.debug("#{inspect id}: aggregate going down gracefully due to inactivity.")
Utils.debug("#{inspect(id)}: aggregate going down gracefully due to inactivity.")
{:stop, :normal, state}
end

Expand All @@ -246,24 +259,33 @@ defmodule Chronik.Aggregate do
# for the aggregate.
defp load_from_store(module, id, store) do
aggregate_tuple = {module, id}

{version, state} =
case store.get_snapshot(aggregate_tuple) do
nil ->
Utils.debug("#{inspect id}: no snapshot found on the store.")
Utils.debug("#{inspect(id)}: no snapshot found on the store.")
{:all, nil}

{version, _state} = snapshot ->
Utils.debug("#{inspect id}: found a snapshot on the store with version " <>
"#{inspect version}")
Utils.debug(
"#{inspect(id)}: found a snapshot on the store with version " <> "#{inspect(version)}"
)

snapshot
end

case store.fetch_by_aggregate(aggregate_tuple, version) do
{:error, _} -> state
{:error, _} ->
state

{:ok, version, records} ->
Utils.debug("#{inspect id}: replaying events up to version: #{inspect version}.")
Utils.debug("#{inspect(id)}: replaying events up to version: #{inspect(version)}.")

new_state =
records
|> Enum.map(&Map.get(&1, :domain_event))
|> apply_events(state, module)

{:ok, version, new_state}
end
end
Expand All @@ -272,73 +294,82 @@ defmodule Chronik.Aggregate do
Enum.reduce(events, state, &module.handle_event/2)
end

defp store_and_publish(events, new_state,
%__MODULE__{id: id,
num_events: num_events,
blocks: blocks,
store: store,
pub_sub: pub_sub,
module: module,
aggregate_version: aggregate_version} = state) do

defp store_and_publish(
events,
new_state,
%__MODULE__{
id: id,
num_events: num_events,
blocks: blocks,
store: store,
pub_sub: pub_sub,
module: module,
aggregate_version: aggregate_version
} = state
) do
# Compute the expected version to be found on the Store.
version =
case aggregate_version do
:empty -> :no_stream
v -> v
end

Utils.debug("#{inspect id}: writing events to the store: #{inspect events}")
Utils.debug("#{inspect(id)}: writing events to the store: #{inspect(events)}")

{new_version, records} =
case store.append({module, id}, events, [version: version]) do
{:ok, v, s} -> {v, s}
case store.append({module, id}, events, version: version) do
{:ok, v, s} ->
{v, s}

{:error, _} ->
raise "a newer version of the aggregate found on the store"
end

Utils.debug("#{inspect id}: broadcasting records: #{inspect records}")
Utils.debug("#{inspect(id)}: broadcasting records: #{inspect(records)}")
pub_sub.broadcast(records)

num_events = num_events + length(events)

blocks =
if div(num_events, get_snapshot_every(module)) > blocks do
Utils.debug("#{inspect id}: saving a snapshot with version #{inspect new_version}")
Utils.debug("#{inspect(id)}: saving a snapshot with version #{inspect(new_version)}")
store.snapshot({module, id}, new_state, new_version)
div(num_events, get_snapshot_every(module))
else
blocks
end

{:reply, :ok,
%__MODULE__{state |
aggregate_state: new_state,
aggregate_version: new_version,
timer: update_timer(state.timer, module),
num_events: num_events,
blocks: blocks
}}
%__MODULE__{
state
| aggregate_state: new_state,
aggregate_version: new_version,
timer: update_timer(state.timer, module),
num_events: num_events,
blocks: blocks
}}
end

defp update_timer(timer, module) do
shutdown_timeout = get_shutdown_timeout(module)

if timer do
Process.cancel_timer(timer)

receive do
:shutdown -> :ok
after
0 -> :ok
end
end

if shutdown_timeout != :infinity,
do: Process.send_after(self(), :shutdown, shutdown_timeout)
if shutdown_timeout != :infinity do
Process.send_after(self(), :shutdown, shutdown_timeout)
end
end

defp get_shutdown_timeout(module),
do: Config.get_config(module, :shutdown_timeout, 15 * 1000 * 60)

defp get_snapshot_every(module),
do: Config.get_config(module, :snapshot_every, 100)
defp get_snapshot_every(module), do: Config.get_config(module, :snapshot_every, 100)
end
4 changes: 3 additions & 1 deletion lib/chronik/aggregate/multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ defmodule Chronik.Aggregate.Multi do

@type monad_state :: {Aggregate.state(), [Chronik.domain_event()], module()}

# API

@doc "Create a new state for a multi-entity command."
@spec new(state :: Aggregate.state(), module :: module()) :: monad_state()
def new(state, module), do: {state, [], module}
Expand All @@ -40,7 +42,7 @@ defmodule Chronik.Aggregate.Multi do
"""
@spec delegate(ms :: monad_state(), lens :: fun(), validator_fun :: fun()) :: monad_state()
def delegate({state, events, module}, lens_fun, validator_fun)
when is_function(lens_fun) and is_function(validator_fun) do
when is_function(lens_fun) and is_function(validator_fun) do
new_events =
state
|> lens_fun.()
Expand Down
5 changes: 3 additions & 2 deletions lib/chronik/aggregate/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ defmodule Chronik.Aggregate.Supervisor do

# API

@spec start_aggregate(aggregate :: atom(), id :: term()) :: {:ok, pid()}
| {:error, term()}
@spec start_aggregate(aggregate :: atom(), id :: term()) ::
{:ok, pid()}
| {:error, term()}
def start_aggregate(aggregate, id) do
Supervisor.start_child(__MODULE__, [aggregate, id])
end
Expand Down
2 changes: 1 addition & 1 deletion lib/chronik/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Chronik.Application do
def start(_type, _args) do
children = [
spec([keys: :unique, name: @aggregates], @aggregates),
Chronik.Aggregate.Supervisor,
Chronik.Aggregate.Supervisor
]

opts = [strategy: :one_for_one, name: Chronik.Supervisor]
Expand Down
8 changes: 4 additions & 4 deletions lib/chronik/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ defmodule Chronik.Config do
@doc """
Returns the adapter configuration for a given `module`
"""
@spec fetch_config(mod :: module(), opts :: Keyword.t) :: {term(), atom()} | no_return()
@spec fetch_config(mod :: module(), opts :: Keyword.t()) :: {term(), atom()} | no_return()
def fetch_config(mod, opts) do
# Stolen from Ecto
otp_app = Keyword.fetch!(opts, :otp_app)
config = Application.get_env(otp_app, mod, [])
config = Application.get_env(otp_app, mod, [])
adapter = opts[:adapter] || config[:adapter]

unless adapter, do: raise Chronik.MissingAdapterError, opts
unless adapter, do: raise(Chronik.MissingAdapterError, opts)

case Code.ensure_loaded?(adapter) do
true -> {config, adapter}
Expand All @@ -36,7 +36,7 @@ defmodule Chronik.Config do
def fetch_adapters do
config = Application.get_env(:chronik, :adapters)
pub_sub = Keyword.fetch!(config, :pub_sub)
store = Keyword.fetch!(config, :store)
store = Keyword.fetch!(config, :store)

{store, pub_sub}
end
Expand Down
Loading

0 comments on commit e6fd1a2

Please sign in to comment.