Skip to content

Commit 09659c4

Browse files
authored
Merge pull request #1 from phonnz/ft-wallet-es
Wallet ES
2 parents 1a9fcc4 + c75770a commit 09659c4

File tree

3 files changed

+194
-1
lines changed

3 files changed

+194
-1
lines changed

fullstack/lib/fullstack/application.ex

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ defmodule Fullstack.Application do
2222
{Cachex, [name: :chat]},
2323
Fullstack.Servers.OperationsSupervisor,
2424
Fullstack.Services.Counters,
25-
{Fullstack.Servers.Generators.Transactions, []}
25+
Fullstack.Servers.Generators.Transactions,
26+
{Registry, [keys: :unique, name: :wallet_projectors]},
27+
{Registry, [keys: :unique, name: Fullstack.Wallet.Aggregate.WalletAggregators]}
2628
] ++ prod_child()
2729

2830
# See https://hexdocs.pm/elixir/Supervisor.html
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
defmodule Fullstack.Wallet.Aggregate.WalletAggregator do
2+
use GenServer
3+
require Logger
4+
@registry Fullstack.Wallet.Aggregate.WalletAggregators
5+
6+
defstruct [:customer_id, :value, event_type: :amount_deposited]
7+
8+
def start_link(customer_id) do
9+
GenServer.start_link(
10+
__MODULE__,
11+
customer_id,
12+
name: register_via(customer_id)
13+
)
14+
end
15+
16+
def apply_event(%{customer_id: account} = event)
17+
when is_binary(account) do
18+
case Registry.lookup(@registry, account) do
19+
[{pid, _}] ->
20+
apply_event(pid, event)
21+
22+
_ ->
23+
Logger.debug("Attempt to apply event to non-existent account, starting aggregator")
24+
{:ok, pid} = start_link(account)
25+
apply_event(pid, event)
26+
end
27+
end
28+
29+
def apply_event(pid, event) when is_pid(pid) do
30+
GenServer.cast(pid, {:handle_event, event})
31+
end
32+
33+
def get_balance(customer_id) when is_binary(customer_id) do
34+
customer_id
35+
|> lookup_aggregator
36+
|> GenServer.call(:get_balance)
37+
end
38+
39+
def get_history(customer_id) when is_binary(customer_id) do
40+
customer_id
41+
|> lookup_aggregator
42+
|> GenServer.call(:get_history)
43+
end
44+
45+
defp lookup_aggregator(customer_id) when is_binary(customer_id) do
46+
with [{pid, _}] <-
47+
Registry.lookup(@registry, customer_id) do
48+
pid
49+
else
50+
_ ->
51+
{:error, :unknown_account}
52+
end
53+
end
54+
55+
@impl true
56+
def init(customer_id) do
57+
{:ok, %{balance: 0, account_number: customer_id, commands: []}}
58+
end
59+
60+
@impl true
61+
def handle_cast({:handle_event, evt}, state) do
62+
{:noreply, handle_event(state, evt)}
63+
end
64+
65+
def handle_event(
66+
%{balance: _bal} = s,
67+
%{event_type: :amount_withdrawn, value: v} = evt
68+
) do
69+
s
70+
|> Map.update(:balance, 0, &(&1 - v))
71+
|> Map.update(:commands, [], &[evt | &1])
72+
end
73+
74+
def handle_event(
75+
%{balance: _bal} = s,
76+
%{event_type: :amount_deposited, value: v} = evt
77+
) do
78+
s
79+
|> Map.update(:balance, 0, &(&1 + v))
80+
|> Map.update(:commands, [], &[evt | &1])
81+
end
82+
83+
def handle_event(
84+
%{balance: _bal} = s,
85+
%{event_type: :fee_applied, value: v} = evt
86+
) do
87+
s
88+
|> Map.update(:balance, 0, &(&1 - v))
89+
|> Map.update(:commands, [], &[evt | &1])
90+
end
91+
92+
@impl true
93+
def handle_call(:get_balance, _from, state) do
94+
{:reply, state.balance, state}
95+
end
96+
97+
@impl true
98+
def handle_call(:get_history, _from, state) do
99+
{:reply, state.commands, state}
100+
end
101+
102+
defp register_via(customer_id) do
103+
{:via, Registry, {@registry, customer_id}}
104+
end
105+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
defmodule Fullstack.Wallet.WalletProjector do
2+
use GenServer
3+
require Logger
4+
5+
@registry :wallet_projectors
6+
defstruct [:customer_id, :value, event_type: :amount_deposited]
7+
8+
def start_link(customer_id) do
9+
GenServer.start_link(
10+
__MODULE__,
11+
customer_id,
12+
name: register_via(customer_id)
13+
)
14+
end
15+
16+
def apply_event(%{customer_id: account} = event)
17+
when is_binary(account) do
18+
case Registry.lookup(@registry, account) do
19+
[{pid, _}] ->
20+
apply_event(pid, event)
21+
22+
_ ->
23+
Logger.debug("Attempt to apply event to non-existent account, starting projector")
24+
{:ok, pid} = start_link(account)
25+
apply_event(pid, event)
26+
end
27+
end
28+
29+
def apply_event(pid, event) when is_pid(pid) do
30+
GenServer.cast(pid, {:handle_event, event})
31+
end
32+
33+
def get_balance(pid) do
34+
GenServer.call(pid, :get_balance)
35+
end
36+
37+
def lookup_balance(customer_id) when is_binary(customer_id) do
38+
with [{pid, _}] <-
39+
Registry.lookup(@registry, customer_id) do
40+
{:ok, get_balance(pid)}
41+
else
42+
_ ->
43+
{:error, :unknown_account}
44+
end
45+
end
46+
47+
@impl true
48+
def init(customer_id) do
49+
{:ok, %{balance: 0, account_number: customer_id}}
50+
end
51+
52+
@impl true
53+
def handle_cast({:handle_event, evt}, state) do
54+
{:noreply, handle_event(state, evt)}
55+
end
56+
57+
def handle_event(
58+
%{balance: bal} = s,
59+
%{event_type: :amount_withdrawn, value: v}
60+
) do
61+
%{s | balance: bal - v}
62+
end
63+
64+
def handle_event(
65+
%{balance: bal} = s,
66+
%{event_type: :amount_deposited, value: v}
67+
) do
68+
%{s | balance: bal + v}
69+
end
70+
71+
def handle_event(
72+
%{balance: bal} = s,
73+
%{event_type: :fee_applied, value: v}
74+
) do
75+
%{s | balance: bal - v}
76+
end
77+
78+
@impl true
79+
def handle_call(:get_balance, _from, state) do
80+
{:reply, state.balance, state}
81+
end
82+
83+
defp register_via(customer_id) do
84+
{:via, Registry, {@registry, customer_id}}
85+
end
86+
end

0 commit comments

Comments
 (0)