Skip to content

Commit 3644406

Browse files
committed
[DEVEX-227] Added retry policy handling for optimistic concurrency of decision making
1 parent 4444c8c commit 3644406

File tree

1 file changed

+61
-28
lines changed
  • src/Kurrent.Client/Streams/DecisionMaking

1 file changed

+61
-28
lines changed

src/Kurrent.Client/Streams/DecisionMaking/Decider.cs

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
using EventStore.Client;
22
using Kurrent.Client.Core.Serialization;
33
using Kurrent.Client.Streams.GettingState;
4+
using Polly;
5+
using Polly.Retry;
46

57
namespace Kurrent.Client.Streams.DecisionMaking;
68

9+
using static AsyncDecider;
10+
711
public delegate ValueTask<Message[]> CommandHandler<in TState>(TState state, CancellationToken ct = default);
812

913
public record AsyncDecider<TState, TCommand>(
@@ -56,43 +60,51 @@ resolvedEvent.DeserializedData is TEvent @event
5660
public class DecideOptions<TState> where TState : notnull {
5761
public GetStreamStateOptions<TState>? GetStateOptions { get; set; }
5862
public AppendToStreamOptions? AppendToStreamOptions { get; set; }
63+
public IAsyncPolicy<IWriteResult>? RetryPolicy { get; set; }
5964
}
6065

6166
public static class KurrentClientDecisionMakingExtensions {
62-
public static async Task<IWriteResult> DecideAsync<TState>(
67+
public static Task<IWriteResult> DecideAsync<TState>(
6368
this KurrentClient eventStore,
6469
string streamName,
6570
CommandHandler<TState> decide,
6671
IStateBuilder<TState> stateBuilder,
6772
DecideOptions<TState>? options,
68-
CancellationToken ct = default
69-
) where TState : notnull {
70-
var (state, streamPosition, position) =
71-
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct);
72-
73-
var events = await decide(state, ct);
74-
75-
if (events.Length == 0) {
76-
return new SuccessResult(
77-
streamPosition.HasValue ? StreamRevision.FromStreamPosition(streamPosition.Value) : StreamRevision.None,
78-
position ?? Position.Start
79-
);
80-
}
81-
82-
var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions();
83-
84-
if (streamPosition.HasValue)
85-
appendToStreamOptions.ExpectedStreamRevision = StreamRevision.FromStreamPosition(streamPosition.Value);
86-
else
87-
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
88-
89-
return await eventStore.AppendToStreamAsync(
90-
streamName,
91-
events.Cast<object>(),
92-
appendToStreamOptions,
93-
cancellationToken: ct
73+
CancellationToken cancellationToken = default
74+
) where TState : notnull =>
75+
DecideRetryPolicy(options).ExecuteAsync(
76+
async ct => {
77+
var (state, streamPosition, position) =
78+
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct);
79+
80+
var messages = await decide(state, ct);
81+
82+
if (messages.Length == 0) {
83+
return new SuccessResult(
84+
streamPosition.HasValue
85+
? StreamRevision.FromStreamPosition(streamPosition.Value)
86+
: StreamRevision.None,
87+
position ?? Position.Start
88+
);
89+
}
90+
91+
var appendToStreamOptions = options?.AppendToStreamOptions ?? new AppendToStreamOptions();
92+
93+
if (streamPosition.HasValue)
94+
appendToStreamOptions.ExpectedStreamRevision ??=
95+
StreamRevision.FromStreamPosition(streamPosition.Value);
96+
else
97+
appendToStreamOptions.ExpectedStreamState ??= StreamState.NoStream;
98+
99+
return await eventStore.AppendToStreamAsync(
100+
streamName,
101+
messages,
102+
appendToStreamOptions,
103+
ct
104+
);
105+
},
106+
cancellationToken
94107
);
95-
}
96108

97109
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
98110
this KurrentClient eventStore,
@@ -183,3 +195,24 @@ public static Task<IWriteResult> DecideAsync<TState, TEvent>(
183195
ct
184196
);
185197
}
198+
199+
public static class AsyncDecider {
200+
public static readonly IAsyncPolicy<IWriteResult> DefaultRetryPolicy =
201+
Policy<IWriteResult>
202+
.Handle<WrongExpectedVersionException>()
203+
.WaitAndRetryAsync(
204+
retryCount: 3,
205+
sleepDurationProvider: retryAttempt => TimeSpan.FromMilliseconds(20 * retryAttempt)
206+
);
207+
208+
public static bool HasUserProvidedExpectedVersioning(AppendToStreamOptions? options) =>
209+
options != null && (options.ExpectedStreamState.HasValue || options.ExpectedStreamRevision.HasValue);
210+
211+
public static IAsyncPolicy<IWriteResult> DecideRetryPolicy<TState>(DecideOptions<TState>? options)
212+
where TState : notnull =>
213+
options?.RetryPolicy ??
214+
(HasUserProvidedExpectedVersioning(options?.AppendToStreamOptions)
215+
// it doesn't make sense to retry, as expected state will be always the same
216+
? Policy.NoOpAsync<IWriteResult>()
217+
: DefaultRetryPolicy);
218+
}

0 commit comments

Comments
 (0)