diff --git a/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobs-howto.md b/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobs-howto.md index 974b2f5ec..8723a7263 100644 --- a/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobs-howto.md +++ b/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobs-howto.md @@ -63,8 +63,8 @@ the dependency injection registration in `Program.cs`, add the following line: ```cs var builder = WebApplication.CreateBuilder(args); -//Add anywhere between these two -builder.Services.AddDaprJobsClient(); //That's it +//Add anywhere between these two lines +builder.Services.AddDaprJobsClient(); var app = builder.Build(); ``` @@ -203,7 +203,8 @@ public class MySampleClass It's easy to set up a jobs endpoint if you're at all familiar with [minimal APIs in ASP.NET Core](https://learn.microsoft.com/en-us/aspnet/core/fundamentals/minimal-apis/overview) as the syntax is the same between the two. Once dependency injection registration has been completed, configure the application the same way you would to handle mapping an HTTP request via the minimal API functionality in ASP.NET Core. Implemented as an extension method, -pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and you can optionally pass a `JobDetails` to get information about the job that has been triggered (e.g. access its scheduling setup or payload). +pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and the job payload can be accessed from the `ReadOnlyMemory` originally provided to the +job registration. There are two delegates you can use here. One provides an `IServiceProvider` in case you need to inject other services into the handler: @@ -216,7 +217,7 @@ builder.Services.AddDaprJobsClient(); var app = builder.Build(); //Add our endpoint registration -app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string? jobName, JobDetails? jobDetails) => { +app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string jobName, ReadOnlyMemory jobPayload) => { var logger = serviceProvider.GetService(); logger?.LogInformation("Received trigger invocation for '{jobName}'", "myJob"); @@ -237,7 +238,7 @@ builder.Services.AddDaprJobsClient(); var app = builder.Build(); //Add our endpoint registration -app.MapDaprScheduledJob("myJob", (string? jobName, JobDetails? jobDetails) => { +app.MapDaprScheduledJob("myJob", (string jobName, ReadOnlyMemory jobPayload) => { //Do something... }); diff --git a/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobsclient-usage.md b/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobsclient-usage.md index bbdbbdbe0..ddbf226ee 100644 --- a/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobsclient-usage.md +++ b/daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobsclient-usage.md @@ -165,31 +165,18 @@ var oneWeekFromNow = now.AddDays(7); await daprJobsClient.ScheduleOneTimeJobWithPayloadAsync("myOtherJob", oneWeekFromNow, "This is a test!"); ``` -The `JobDetails` type returns the data as a `ReadOnlyMemory?` so the developer has the freedom to deserialize +The delegate handling the job invocation expects at least two arguments to be present: +- A `string` that is populated with the `jobName`, providing the name of the invoked job +- A `ReadOnlyMemory` that is populated with the bytes originally provided during the job registration. + +Because the payload is stored as a `ReadOnlyMemory`, the developer has the freedom to serialize and deserialize as they wish, but there are again two helper extensions included that can deserialize this to either a JSON-compatible type or a string. Both methods assume that the developer encoded the originally scheduled job (perhaps using the helper serialization methods) as these methods will not force the bytes to represent something they're not. To deserialize the bytes to a string, the following helper method can be used: ```cs -if (jobDetails.Payload is not null) -{ - string payloadAsString = jobDetails.Payload.DeserializeToString(); //If successful, returns a string value with the value -} -``` - -To deserialize JSON-encoded UTF-8 bytes to the corresponding type, the following helper method can be used. An -overload argument is available that permits the developer to pass in their own `JsonSerializerOptions` to be applied -during deserialization. - -```cs -public sealed record Doodad (string Name, int Value); - -//... -if (jobDetails.Payload is not null) -{ - var deserializedDoodad = jobDetails.Payload.DeserializeFromJsonBytes(); -} +var payloadAsString = Encoding.UTF8.GetString(jobPayload.Span); //If successful, returns a string with the value ``` ## Error handling diff --git a/examples/Jobs/JobsSample/Program.cs b/examples/Jobs/JobsSample/Program.cs index 30ca85ba0..86c1c7316 100644 --- a/examples/Jobs/JobsSample/Program.cs +++ b/examples/Jobs/JobsSample/Program.cs @@ -16,37 +16,29 @@ using Dapr.Jobs; using Dapr.Jobs.Extensions; using Dapr.Jobs.Models; -using Dapr.Jobs.Models.Responses; var builder = WebApplication.CreateBuilder(args); - -builder.Services.AddDaprJobsClient(); +builder.Logging.ClearProviders(); +builder.Logging.AddConsole(); var app = builder.Build(); //Set a handler to deal with incoming jobs -var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); -app.MapDaprScheduledJobHandler((string? jobName, DaprJobDetails? jobDetails, ILogger? logger, CancellationToken cancellationToken) => +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(15)); +app.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory jobPayload, ILogger? logger, CancellationToken cancellationToken) => { logger?.LogInformation("Received trigger invocation for job '{jobName}'", jobName); - if (jobDetails?.Payload is not null) - { - var deserializedPayload = Encoding.UTF8.GetString(jobDetails.Payload); - logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'", - jobName, deserializedPayload); - //Do something that needs the cancellation token - } - else - { - logger?.LogWarning("Failed to deserialize payload for job '{jobName}'", jobName); - } + + var deserializedPayload = Encoding.UTF8.GetString(jobPayload.Span); + logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'", + jobName, deserializedPayload); + await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken); + return Task.CompletedTask; }, cancellationTokenSource.Token); -app.Run(); - -await using var scope = app.Services.CreateAsyncScope(); -var logger = scope.ServiceProvider.GetRequiredService(); +using var scope = app.Services.CreateScope(); +var logger = scope.ServiceProvider.GetRequiredService>(); var daprJobsClient = scope.ServiceProvider.GetRequiredService(); logger.LogInformation("Scheduling one-time job 'myJob' to execute 10 seconds from now"); @@ -54,5 +46,6 @@ await daprJobsClient.ScheduleJobAsync("myJob", DaprJobSchedule.FromDateTime(Date Encoding.UTF8.GetBytes("This is a test")); logger.LogInformation("Scheduled one-time job 'myJob'"); +app.Run(); #pragma warning restore CS0618 // Type or member is obsolete diff --git a/src/Dapr.Jobs/DaprJobsGrpcClient.cs b/src/Dapr.Jobs/DaprJobsGrpcClient.cs index b548290df..5a0216aec 100644 --- a/src/Dapr.Jobs/DaprJobsGrpcClient.cs +++ b/src/Dapr.Jobs/DaprJobsGrpcClient.cs @@ -77,13 +77,22 @@ public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule sche ArgumentNullException.ThrowIfNull(jobName, nameof(jobName)); ArgumentNullException.ThrowIfNull(schedule, nameof(schedule)); - var job = new Autogenerated.Job { Name = jobName, Schedule = schedule.ExpressionValue }; + var job = new Autogenerated.Job { Name = jobName }; - if (startingFrom is not null) + //Set up the schedule (recurring or point in time) + if (schedule.IsPointInTimeExpression) + { + job.DueTime = schedule.ExpressionValue; + } + else if (schedule.IsCronExpression || schedule.IsDurationExpression || schedule.IsPrefixedPeriodExpression) + { + job.Schedule = schedule.ExpressionValue; + } + else if (startingFrom is not null) { job.DueTime = ((DateTimeOffset)startingFrom).ToString("O"); } - + if (repeats is not null) { if (repeats < 0) diff --git a/src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs b/src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs index 26ef579cc..b68ea94a2 100644 --- a/src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs +++ b/src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs @@ -11,8 +11,6 @@ // limitations under the License. // ------------------------------------------------------------------------ -using System.Text.Json; -using Dapr.Jobs.Models.Responses; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Routing; @@ -29,8 +27,8 @@ public static class EndpointRouteBuilderExtensions /// /// The to add the route to. /// The asynchronous action provided by the developer that handles any inbound requests. The first two - /// parameters must be a nullable for the jobName and a nullable with the - /// payload details, but otherwise can be populated with additional services to be injected into the delegate. + /// parameters must be a for the jobName and the originally registered ReadOnlyMemory<byte> with the + /// payload value, but otherwise can be populated with additional services to be injected into the delegate. /// Cancellation token that will be passed in as the last parameter to the delegate action. public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRouteBuilder endpoints, Delegate action, CancellationToken cancellationToken = default) @@ -40,29 +38,25 @@ public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRou endpoints.MapPost("/job/{jobName}", async context => { - var jobName = (string?)context.Request.RouteValues["jobName"]; - DaprJobDetails? jobPayload = null; + //Retrieve the name of the job from the request path + var jobName = string.Empty; + if (context.Request.RouteValues.TryGetValue("jobName", out var capturedJobName)) + { + jobName = (string)capturedJobName!; + } + //Retrieve the job payload from the request body + ReadOnlyMemory payload = new(); if (context.Request.ContentLength is > 0) { - using var reader = new StreamReader(context.Request.Body); - var body = await reader.ReadToEndAsync(); - - try - { - var deserializedJobPayload = JsonSerializer.Deserialize(body); - jobPayload = deserializedJobPayload?.ToType() ?? null; - } - catch (JsonException) - { - jobPayload = null; - } + using var streamContent = new StreamContent(context.Request.Body); + payload = await streamContent.ReadAsByteArrayAsync(cancellationToken); } - var parameters = new Dictionary + var parameters = new Dictionary { { typeof(string), jobName }, - { typeof(DaprJobDetails), jobPayload }, + { typeof(ReadOnlyMemory), payload }, { typeof(CancellationToken), CancellationToken.None } }; diff --git a/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs b/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs index 9b940beed..1e53d95e0 100644 --- a/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs +++ b/src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs @@ -11,9 +11,6 @@ // limitations under the License. // ------------------------------------------------------------------------ -using System.Text.Json.Serialization; -using Dapr.Jobs.JsonConverters; - namespace Dapr.Jobs.Models.Responses; /// @@ -46,52 +43,3 @@ public sealed record DaprJobDetails(DaprJobSchedule Schedule) /// public byte[]? Payload { get; init; } = null; } - -/// -/// A deserializable version of the . -/// -internal sealed record DeserializableDaprJobDetails -{ - /// - /// Represents the schedule that triggers the job. - /// - public string? Schedule { get; init; } - - /// - /// Allows for jobs with fixed repeat counts. - /// - public int? RepeatCount { get; init; } = null; - - /// - /// Identifies a point-in-time representing when the job schedule should start from, - /// or as a "one-shot" time if other scheduling fields are not provided. - /// - [JsonConverter(typeof(Iso8601DateTimeJsonConverter))] - public DateTimeOffset? DueTime { get; init; } = null; - - /// - /// A point-in-time value representing with the job should expire. - /// - /// - /// This must be greater than if both are set. - /// - [JsonConverter(typeof(Iso8601DateTimeJsonConverter))] - public DateTimeOffset? Ttl { get; init; } = null; - - /// - /// Stores the main payload of the job which is passed to the trigger function. - /// - public byte[]? Payload { get; init; } = null; - - public DaprJobDetails ToType() - { - var schedule = DaprJobSchedule.FromExpression(Schedule ?? string.Empty); - return new DaprJobDetails(schedule) - { - DueTime = DueTime, - Payload = Payload, - RepeatCount = RepeatCount, - Ttl = Ttl - }; - } -} diff --git a/test/Dapr.Jobs.Test/Extensions/EndpointRouteBuilderExtensionsTests.cs b/test/Dapr.Jobs.Test/Extensions/EndpointRouteBuilderExtensionsTests.cs index fa4d094e1..bb01be5ef 100644 --- a/test/Dapr.Jobs.Test/Extensions/EndpointRouteBuilderExtensionsTests.cs +++ b/test/Dapr.Jobs.Test/Extensions/EndpointRouteBuilderExtensionsTests.cs @@ -40,15 +40,7 @@ public async Task MapDaprScheduledJobHandler_ValidRequest_ExecutesAction() var client = server.CreateClient(); var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789)); - var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload); - var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *")) - { - RepeatCount = 5, - DueTime = DateTimeOffset.UtcNow, - Ttl = DateTimeOffset.UtcNow.AddHours(1), - Payload = serializedPayloadBytes - }; - var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json"); + var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json"); const string jobName = "testJob"; var response = await client.PostAsync($"/job/{jobName}", content); @@ -68,15 +60,7 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken() var client = server.CreateClient(); var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789)); - var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload); - var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *")) - { - RepeatCount = 5, - DueTime = DateTimeOffset.UtcNow, - Ttl = DateTimeOffset.UtcNow.AddHours(1), - Payload = serializedPayloadBytes - }; - var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json"); + var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json"); const string jobName = "testJob"; var response = await client.PostAsync($"/job/{jobName}", content); @@ -89,31 +73,11 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken() Assert.Equal(serializedPayload, validator.SerializedPayload); } - - [Fact] - public async Task MapDaprScheduledJobHandler_InvalidPayload() - { - // Arrange - var server = CreateTestServer(); - var client = server.CreateClient(); - - var content = new StringContent("", Encoding.UTF8, "application/json"); - - // Act - const string jobName = "testJob"; - var response = await client.PostAsync($"/job/{jobName}", content); - - var validator = server.Services.GetRequiredService(); - Assert.Equal(jobName, validator.JobName); - Assert.Null(validator.SerializedPayload); - } - private sealed record SamplePayload(string Name, int Count); public sealed class Validator { public string? JobName { get; set; } - public string? SerializedPayload { get; set; } } @@ -130,15 +94,10 @@ private static TestServer CreateTestServer() app.UseRouting(); app.UseEndpoints(endpoints => { - endpoints.MapDaprScheduledJobHandler(async (string? jobName, DaprJobDetails? jobDetails, Validator validator, CancellationToken cancellationToken) => + endpoints.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory jobPayload, Validator validator, CancellationToken cancellationToken) => { - if (jobName is not null) - validator.JobName = jobName; - if (jobDetails?.Payload is not null) - { - var payloadString = Encoding.UTF8.GetString(jobDetails.Payload); - validator.SerializedPayload = payloadString; - } + validator.JobName = jobName; + validator.SerializedPayload = Encoding.UTF8.GetString(jobPayload.Span); await Task.CompletedTask; }); }); @@ -160,15 +119,12 @@ private static TestServer CreateTestServer2() app.UseRouting(); app.UseEndpoints(endpoints => { - endpoints.MapDaprScheduledJobHandler(async (string? jobName, Validator validator, DaprJobDetails? jobDetails) => + endpoints.MapDaprScheduledJobHandler(async (string jobName, Validator validator, ReadOnlyMemory payload) => { - if (jobName is not null) - validator.JobName = jobName; - if (jobDetails?.Payload is not null) - { - var payloadString = Encoding.UTF8.GetString(jobDetails.Payload); - validator.SerializedPayload = payloadString; - } + validator.JobName = jobName; + + var payloadString = Encoding.UTF8.GetString(payload.Span); + validator.SerializedPayload = payloadString; await Task.CompletedTask; }); });