Skip to content

Commit

Permalink
Merge pull request #61 from deveel/41-batch-sending-of-webhooks
Browse files Browse the repository at this point in the history
Batch Sending of Webhooks
  • Loading branch information
tsutomi authored Feb 16, 2024
2 parents 6ca5334 + d772220 commit 3fb75fa
Show file tree
Hide file tree
Showing 29 changed files with 788 additions and 160 deletions.
18 changes: 11 additions & 7 deletions samples/WebhookNotifierApp/Services/UserCreatedWebhookFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ public UserCreatedWebhookFactory(IUserResolver userResolver) {
this.userResolver = userResolver;
}

public async Task<IdentityWebhook> CreateAsync(IWebhookSubscription subscription, EventInfo eventInfo, CancellationToken cancellationToken = default) {
var userCreated = (UserCreatedEvent?)eventInfo.Data;
public async Task<IList<IdentityWebhook>> CreateAsync(IWebhookSubscription subscription, EventNotification notification, CancellationToken cancellationToken = default) {
var @event = notification.Events[0];

var userCreated = (UserCreatedEvent?)@event.Data;
var user = await userResolver.ResolveUserAsync(userCreated!.UserId, cancellationToken);

if (user == null)
throw new InvalidOperationException();

return new IdentityWebhook {
EventId = eventInfo.Id,
EventType = "user_created",
TimeStamp = eventInfo.TimeStamp,
User = user
return new [] {
new IdentityWebhook {
EventId = @event.Id,
EventType = @event.EventType,
TimeStamp = @event.TimeStamp,
User = user
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Concurrent;
using System.Linq.Dynamic.Core;
using System.Linq.Expressions;

Expand All @@ -23,7 +24,7 @@ namespace Deveel.Webhooks {
/// </summary>
/// <typeparam name="TWebhook"></typeparam>
public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluator<TWebhook> where TWebhook : class {
private readonly IDictionary<string, Func<object, bool>> filterCache;
private readonly IDictionary<FilterKey, Func<object, bool>> filterCache;
private readonly WebhookSenderOptions<TWebhook> senderOptions;

/// <summary>
Expand All @@ -35,7 +36,7 @@ public sealed class LinqWebhookFilterEvaluator<TWebhook> : IWebhookFilterEvaluat
/// the filter evaluator.
/// </param>
public LinqWebhookFilterEvaluator(IOptions<WebhookSenderOptions<TWebhook>> senderOptions) {
filterCache = new Dictionary<string, Func<object, bool>>();
filterCache = new ConcurrentDictionary<FilterKey, Func<object, bool>>();
this.senderOptions = senderOptions.Value;
}

Expand All @@ -51,15 +52,16 @@ static LinqWebhookFilterEvaluator() {
string IWebhookFilterEvaluator<TWebhook>.Format => "linq";

private Func<object, bool> Compile(Type objType, string filter) {
if (!filterCache.TryGetValue(filter, out var compiled)) {
var key = new FilterKey(objType.FullName!, filter);
if (!filterCache.TryGetValue(key, out var compiled)) {
var config = ParsingConfig.Default;

var parameters = new[] {
Expression.Parameter(objType, "hook")
};
var parsed = DynamicExpressionParser.ParseLambda(config, parameters, typeof(bool), filter).Compile();
compiled = hook => (bool)(parsed.DynamicInvoke(hook)!);
filterCache[filter] = compiled;
filterCache[key] = compiled;
}

return compiled;
Expand All @@ -84,10 +86,8 @@ private Func<object, bool> Compile(Type objType, IList<string> filters) {

/// <inheritdoc/>
public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook webhook, CancellationToken cancellationToken) {
if (filter is null)
throw new ArgumentNullException(nameof(filter));
if (webhook is null)
throw new ArgumentNullException(nameof(webhook));
ArgumentNullException.ThrowIfNull(filter, nameof(filter));
ArgumentNullException.ThrowIfNull(webhook, nameof(webhook));

if (filter.FilterFormat != "linq")
throw new ArgumentException($"Filter format '{filter.FilterFormat}' not supported by the LINQ evaluator");
Expand All @@ -111,7 +111,27 @@ public async Task<bool> MatchesAsync(WebhookSubscriptionFilter filter, TWebhook
} catch(Exception ex) {
throw new WebhookException("Unable to evaluate the filter", ex);
}
}

readonly struct FilterKey {
public FilterKey(string typeName, string filter) : this() {
TypeName = typeName;
Filter = filter;
}

public string TypeName { get; }

public string Filter { get; }

public override bool Equals(object? obj) {
return obj is FilterKey key &&
TypeName == key.TypeName &&
Filter == key.Filter;
}

public override int GetHashCode() {
return HashCode.Combine(TypeName, Filter);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
/// Converts the given <see cref="EventInfo"/> and the webhook object into
/// a <see cref="MongoWebhook"/> object.
/// </summary>
/// <param name="eventInfo">
/// The event information that is being notified.
/// <param name="notification">
/// The event notification that was sent to the subscribers.
/// </param>
/// <param name="webhook">
/// The webhook that was notified to the subscribers.
Expand All @@ -42,7 +42,7 @@ public class DefaultMongoWebhookConverter<TWebhook> : IMongoWebhookConverter<TWe
/// Returns an instance of <see cref="MongoWebhook"/> that represents the
/// webhook that can be stored into the database.
/// </returns>
public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
if (webhook is IWebhook obj) {
return new MongoWebhook {
WebhookId = obj.Id,

Check warning on line 48 in src/Deveel.Webhooks.Service.MongoDb/Webhooks/DefaultMongoWebhookConverter.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Possible null reference assignment.
Expand All @@ -52,10 +52,14 @@ public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
};
}

// TODO: we should support multiple events in a single notification

var firstEvent = notification.Events.First();

return new MongoWebhook {
EventType = eventInfo.EventType,
TimeStamp = eventInfo.TimeStamp,
WebhookId = eventInfo.Id,
EventType = notification.EventType,
TimeStamp = firstEvent.TimeStamp,
WebhookId = notification.NotificationId,
Data = BsonValueUtil.ConvertData(webhook)
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ public interface IMongoWebhookConverter<TWebhook>
/// Converts the given webhook to an object that can be stored
/// in a MongoDB database.
/// </summary>
/// <param name="eventInfo">
/// The information about the event that triggered the
/// notification of the webhook.
/// <param name="notification">
/// The event notification that was sent to the subscribers.
/// </param>
/// <param name="webhook">
/// The instance of the webhook to be converted.
Expand All @@ -37,6 +36,6 @@ public interface IMongoWebhookConverter<TWebhook>
/// Returns an instance of <see cref="MongoWebhook"/>
/// that can be stored in a MongoDB database.
/// </returns>
MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook);
MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections;
using System.Reflection;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using MongoDB.Bson;

namespace Deveel.Webhooks {
/// <summary>
/// An implementation of <see cref="IWebhookDeliveryResultLogger{TWebhook}"/> that
Expand Down Expand Up @@ -75,6 +70,9 @@ public MongoDbWebhookDeliveryResultLogger(
/// Converts the given result to an object that can be stored in the
/// MongoDB database collection.
/// </summary>
/// <param name="notification">
/// The aggregate of the events that are being delivered to the receiver.
/// </param>
/// <param name="eventInfo">
/// The information about the event that triggered the delivery of the webhook.
/// </param>
Expand All @@ -87,14 +85,14 @@ public MongoDbWebhookDeliveryResultLogger(
/// <returns>
/// Returns an object that can be stored in the MongoDB database collection.
/// </returns>
protected virtual TResult ConvertResult(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
protected virtual TResult ConvertResult(EventNotification notification, EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result) {
var obj = new TResult();

obj.TenantId = subscription.TenantId;
obj.OperationId = result.OperationId;
obj.EventInfo = CreateEvent(eventInfo);
obj.Receiver = CreateReceiver(subscription);
obj.Webhook = ConvertWebhook(eventInfo, result.Webhook);
obj.Webhook = ConvertWebhook(notification, result.Webhook);
obj.DeliveryAttempts = result.Attempts?.Select(ConvertDeliveryAttempt).ToList()
?? new List<MongoWebhookDeliveryAttempt>();

Expand Down Expand Up @@ -162,8 +160,8 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
/// Converts the given webhook to an object that can be stored in the
/// MongoDB database collection.
/// </summary>
/// <param name="eventInfo">
/// The information about the event that triggered the delivery of the webhook.
/// <param name="notification">
/// The aggregate of the events that are being delivered to the receiver.
/// </param>
/// <param name="webhook">
/// The instance of the webhook to convert.
Expand All @@ -175,15 +173,15 @@ protected virtual MongoWebhookDeliveryAttempt ConvertDeliveryAttempt(WebhookDeli
/// Thrown when the given type of webhook is not supported by this instance and
/// no converter was provided.
/// </exception>
protected virtual MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) {
protected virtual MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook) {
if (webhookConverter != null)
return webhookConverter.ConvertWebhook(eventInfo, webhook);
return webhookConverter.ConvertWebhook(notification, webhook);

throw new NotSupportedException("The given type of webhook is not supported by this instance of the logger");
}

/// <inheritdoc/>
public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
public async Task LogResultAsync(EventNotification notification, IWebhookSubscription subscription, WebhookDeliveryResult<TWebhook> result, CancellationToken cancellationToken) {
ArgumentNullException.ThrowIfNull(result, nameof(result));
ArgumentNullException.ThrowIfNull(subscription, nameof(subscription));

Expand All @@ -195,11 +193,11 @@ public async Task LogResultAsync(EventInfo eventInfo, IWebhookSubscription subsc
typeof(TWebhook), subscription.TenantId);

try {
var resultObj = ConvertResult(eventInfo, subscription, result);
var results = notification.Select(e => ConvertResult(notification, e, subscription, result));

var repository = await RepositoryProvider.GetRepositoryAsync(subscription.TenantId, cancellationToken);

await repository.AddAsync(resultObj, cancellationToken);
await repository.AddRangeAsync(results, cancellationToken);
} catch (Exception ex) {
Logger.LogError(ex, "Could not log the result of the delivery of the Webhook of type '{WebhookType}' for tenant '{TenantId}' because of an error",
typeof(TWebhook), subscription.TenantId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook,
/// <returns>
/// Returns the current instance of the builder for chaining.
/// </returns>
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventInfo, TWebhook, MongoWebhook> converter)
public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>(Func<EventNotification, TWebhook, MongoWebhook> converter)
where TWebhook : class {

Services.AddSingleton<IMongoWebhookConverter<TWebhook>>(new MongoWebhookConverterWrapper<TWebhook>(converter));
Expand All @@ -211,13 +211,14 @@ public MongoDbWebhookStorageBuilder<TSubscription> UseWebhookConverter<TWebhook>
}

private class MongoWebhookConverterWrapper<TWebhook> : IMongoWebhookConverter<TWebhook> where TWebhook : class {
private readonly Func<EventInfo, TWebhook, MongoWebhook> converter;
private readonly Func<EventNotification, TWebhook, MongoWebhook> converter;

public MongoWebhookConverterWrapper(Func<EventInfo, TWebhook, MongoWebhook> converter) {
public MongoWebhookConverterWrapper(Func<EventNotification, TWebhook, MongoWebhook> converter) {
this.converter = converter;
}

public MongoWebhook ConvertWebhook(EventInfo eventInfo, TWebhook webhook) => converter.Invoke(eventInfo, webhook);
public MongoWebhook ConvertWebhook(EventNotification notification, TWebhook webhook)
=> converter.Invoke(notification, webhook);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task<IList<IWebhookSubscription>> ResolveSubscriptionsAsync(string
var list = await GetCachedAsync(eventType, cancellationToken);

if (list == null) {
logger.LogTrace("No webhook subscriptions to event {EventType} of tenant {TenantId} were found in cache", eventType);
logger.LogTrace("No webhook subscriptions to event {EventType} were found in cache", eventType);

var result = await repository.GetByEventTypeAsync(eventType, activeOnly, cancellationToken);
list = result.Cast<IWebhookSubscription>().ToList();
Expand Down
4 changes: 4 additions & 0 deletions src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Microsoft.Extensions.Options;

namespace Deveel.Webhooks {
/// <summary>
/// A default implementation of the <see cref="IWebhookFactory{TWebhook}"/>
/// that creates a <see cref="Webhook"/> instance using the information
/// provided by the subscription and the event.
/// </summary>
public sealed class DefaultWebhookFactory : DefaultWebhookFactory<Webhook> {
public DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>> options) : base(options) {

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'

Check warning on line 24 in src/Deveel.Webhooks/Webhooks/DefaultWebhookFactory.cs

View workflow job for this annotation

GitHub Actions / Build and Test

Missing XML comment for publicly visible type or member 'DefaultWebhookFactory.DefaultWebhookFactory(IOptions<WebhookFactoryOptions<Webhook>>)'
}
}
}
Loading

0 comments on commit 3fb75fa

Please sign in to comment.