Skip to content

Commit f983649

Browse files
committed
[DEVEX-227] Added state cache for ProjectState function
1 parent dd8cf4c commit f983649

File tree

2 files changed

+82
-48
lines changed

2 files changed

+82
-48
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using System.Runtime.CompilerServices;
2+
using EventStore.Client;
3+
4+
namespace Kurrent.Client.Streams.GettingState;
5+
6+
public class ProjectStateOptions<TState> {
7+
public Func<ResolvedEvent, string>? GetProjectedId { get; set; }
8+
9+
public IStateCache<TState>? StateCache { get; set; }
10+
}
11+
12+
public static class KurrentClientProjectStateExtensions {
13+
public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
14+
this IAsyncEnumerable<ResolvedEvent> messages,
15+
TState initialState,
16+
Func<TState, ResolvedEvent, TState> evolve,
17+
ProjectStateOptions<TState>? options,
18+
[EnumeratorCancellation] CancellationToken ct
19+
) where TState : notnull {
20+
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
21+
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) {
22+
yield return new StateAtPointInTime<TState>(initialState);
23+
24+
yield break;
25+
}
26+
}
27+
28+
var getProjectedId = options?.GetProjectedId ?? (resolvedEvent => resolvedEvent.OriginalStreamId);
29+
var stateCache = options?.StateCache ?? new DictionaryStateCache<TState>();
30+
31+
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
32+
var projectedId = getProjectedId(resolvedEvent);
33+
34+
var state = await stateCache.GetValueOrDefaultAsync(projectedId, initialState, ct).ConfigureAwait(false);
35+
36+
state = evolve(state, resolvedEvent);
37+
38+
await stateCache.SetValueAsync(projectedId, state, ct).ConfigureAwait(false);
39+
40+
yield return new StateAtPointInTime<TState>(
41+
state,
42+
resolvedEvent.Event.EventNumber,
43+
resolvedEvent.Event.Position
44+
);
45+
}
46+
}
47+
48+
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
49+
this IAsyncEnumerable<ResolvedEvent> messages,
50+
TState initialState,
51+
Func<TState, ResolvedEvent, TState> evolve,
52+
CancellationToken ct
53+
) where TState : notnull =>
54+
messages.ProjectState(initialState, evolve, null, ct);
55+
}
56+
57+
58+
public interface IStateCache<TState> {
59+
public ValueTask<TState> GetValueOrDefaultAsync(string key, TState defaultValue, CancellationToken ct = default);
60+
61+
public ValueTask SetValueAsync(string key, TState state, CancellationToken ct = default);
62+
}
63+
64+
public class DictionaryStateCache<TState> : IStateCache<TState> {
65+
readonly Dictionary<string, TState> _states = new Dictionary<string, TState>();
66+
67+
public ValueTask<TState> GetValueOrDefaultAsync(string key, TState defaultValue, CancellationToken ct = default) {
68+
#if NET48
69+
var state = _states.TryGetValue(key, out TState? value) ? value : defaultValue;
70+
#else
71+
var state = _states.GetValueOrDefault(key, defaultValue);
72+
#endif
73+
return new ValueTask<TState>(state);
74+
}
75+
76+
public ValueTask SetValueAsync(string key, TState state, CancellationToken ct = default) {
77+
_states[key] = state;
78+
79+
return new ValueTask();
80+
}
81+
}

src/Kurrent.Client/Streams/GettingState/StateBuilder.cs

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
172172
return await eventStore.ReadStreamAsync(streamName, options, ct)
173173
.GetStateAsync(stateBuilder, ct);
174174
}
175-
175+
176176
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
177177
this IAsyncEnumerable<ResolvedEvent> messages,
178178
TState initialState,
@@ -197,53 +197,6 @@ CancellationToken ct
197197
return new StateAtPointInTime<TState>(state, lastEvent?.Event.EventNumber, lastEvent?.Event.Position);
198198
}
199199

200-
public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
201-
this IAsyncEnumerable<ResolvedEvent> messages,
202-
TState initialState,
203-
Func<TState, ResolvedEvent, TState> evolve,
204-
Func<ResolvedEvent, string>? getProjectedId,
205-
[EnumeratorCancellation] CancellationToken ct
206-
) where TState : notnull {
207-
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
208-
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) {
209-
yield return new StateAtPointInTime<TState>(initialState);
210-
211-
yield break;
212-
}
213-
}
214-
215-
var states = new Dictionary<string, TState>();
216-
217-
getProjectedId ??= resolvedEvent => resolvedEvent.OriginalStreamId;
218-
219-
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
220-
var projectedId = getProjectedId(resolvedEvent);
221-
#if NET48
222-
var state = states.TryGetValue(projectedId, out TState? value) ? value : initialState;
223-
#else
224-
var state = states.GetValueOrDefault(projectedId, initialState);
225-
#endif
226-
227-
state = evolve(state, resolvedEvent);
228-
229-
states[projectedId] = state;
230-
231-
yield return new StateAtPointInTime<TState>(
232-
state,
233-
resolvedEvent.Event.EventNumber,
234-
resolvedEvent.Event.Position
235-
);
236-
}
237-
}
238-
239-
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
240-
this IAsyncEnumerable<ResolvedEvent> messages,
241-
TState initialState,
242-
Func<TState, ResolvedEvent, TState> evolve,
243-
CancellationToken ct
244-
) where TState : notnull =>
245-
messages.ProjectState(initialState, evolve, null, ct);
246-
247200
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
248201
this KurrentClient eventStore,
249202
string streamName,

0 commit comments

Comments
 (0)