Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Jobs SDK bugs #1456

Merged
merged 8 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ the dependency injection registration in `Program.cs`, add the following line:
```cs
var builder = WebApplication.CreateBuilder(args);

//Add anywhere between these two
builder.Services.AddDaprJobsClient(); //That's it
//Add anywhere between these two lines
builder.Services.AddDaprJobsClient();

var app = builder.Build();
```
Expand Down Expand Up @@ -203,7 +203,8 @@ public class MySampleClass
It's easy to set up a jobs endpoint if you're at all familiar with [minimal APIs in ASP.NET Core](https://learn.microsoft.com/en-us/aspnet/core/fundamentals/minimal-apis/overview) as the syntax is the same between the two.

Once dependency injection registration has been completed, configure the application the same way you would to handle mapping an HTTP request via the minimal API functionality in ASP.NET Core. Implemented as an extension method,
pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and you can optionally pass a `JobDetails` to get information about the job that has been triggered (e.g. access its scheduling setup or payload).
pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and the job payload can be accessed from the `ReadOnlyMemory<byte>` originally provided to the
job registration.

There are two delegates you can use here. One provides an `IServiceProvider` in case you need to inject other services into the handler:

Expand All @@ -216,7 +217,7 @@ builder.Services.AddDaprJobsClient();
var app = builder.Build();

//Add our endpoint registration
app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string? jobName, JobDetails? jobDetails) => {
app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string jobName, ReadOnlyMemory<byte> jobPayload) => {
var logger = serviceProvider.GetService<ILogger>();
logger?.LogInformation("Received trigger invocation for '{jobName}'", "myJob");

Expand All @@ -237,7 +238,7 @@ builder.Services.AddDaprJobsClient();
var app = builder.Build();

//Add our endpoint registration
app.MapDaprScheduledJob("myJob", (string? jobName, JobDetails? jobDetails) => {
app.MapDaprScheduledJob("myJob", (string jobName, ReadOnlyMemory<byte> jobPayload) => {
//Do something...
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,31 +165,18 @@ var oneWeekFromNow = now.AddDays(7);
await daprJobsClient.ScheduleOneTimeJobWithPayloadAsync("myOtherJob", oneWeekFromNow, "This is a test!");
```

The `JobDetails` type returns the data as a `ReadOnlyMemory<byte>?` so the developer has the freedom to deserialize
The delegate handling the job invocation expects at least two arguments to be present:
- A `string` that is populated with the `jobName`, providing the name of the invoked job
- A `ReadOnlyMemory<byte>` that is populated with the bytes originally provided during the job registration.

Because the payload is stored as a `ReadOnlyMemory<byte>`, the developer has the freedom to serialize and deserialize
as they wish, but there are again two helper extensions included that can deserialize this to either a JSON-compatible
type or a string. Both methods assume that the developer encoded the originally scheduled job (perhaps using the
helper serialization methods) as these methods will not force the bytes to represent something they're not.

To deserialize the bytes to a string, the following helper method can be used:
```cs
if (jobDetails.Payload is not null)
{
string payloadAsString = jobDetails.Payload.DeserializeToString(); //If successful, returns a string value with the value
}
```

To deserialize JSON-encoded UTF-8 bytes to the corresponding type, the following helper method can be used. An
overload argument is available that permits the developer to pass in their own `JsonSerializerOptions` to be applied
during deserialization.

```cs
public sealed record Doodad (string Name, int Value);

//...
if (jobDetails.Payload is not null)
{
var deserializedDoodad = jobDetails.Payload.DeserializeFromJsonBytes<Doodad>();
}
var payloadAsString = Encoding.UTF8.GetString(jobPayload.Span); //If successful, returns a string with the value
```

## Error handling
Expand Down
33 changes: 13 additions & 20 deletions examples/Jobs/JobsSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,36 @@
using Dapr.Jobs;
using Dapr.Jobs.Extensions;
using Dapr.Jobs.Models;
using Dapr.Jobs.Models.Responses;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient();
builder.Logging.ClearProviders();
builder.Logging.AddConsole();

var app = builder.Build();

//Set a handler to deal with incoming jobs
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
app.MapDaprScheduledJobHandler((string? jobName, DaprJobDetails? jobDetails, ILogger? logger, CancellationToken cancellationToken) =>
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(15));
app.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory<byte> jobPayload, ILogger? logger, CancellationToken cancellationToken) =>
{
logger?.LogInformation("Received trigger invocation for job '{jobName}'", jobName);
if (jobDetails?.Payload is not null)
{
var deserializedPayload = Encoding.UTF8.GetString(jobDetails.Payload);
logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'",
jobName, deserializedPayload);
//Do something that needs the cancellation token
}
else
{
logger?.LogWarning("Failed to deserialize payload for job '{jobName}'", jobName);
}

var deserializedPayload = Encoding.UTF8.GetString(jobPayload.Span);
logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'",
jobName, deserializedPayload);
await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);

return Task.CompletedTask;
}, cancellationTokenSource.Token);

app.Run();

await using var scope = app.Services.CreateAsyncScope();
var logger = scope.ServiceProvider.GetRequiredService<ILogger>();
using var scope = app.Services.CreateScope();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();

logger.LogInformation("Scheduling one-time job 'myJob' to execute 10 seconds from now");
await daprJobsClient.ScheduleJobAsync("myJob", DaprJobSchedule.FromDateTime(DateTime.UtcNow.AddSeconds(10)),
Encoding.UTF8.GetBytes("This is a test"));
logger.LogInformation("Scheduled one-time job 'myJob'");

app.Run();

#pragma warning restore CS0618 // Type or member is obsolete
15 changes: 12 additions & 3 deletions src/Dapr.Jobs/DaprJobsGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,22 @@ public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule sche
ArgumentNullException.ThrowIfNull(jobName, nameof(jobName));
ArgumentNullException.ThrowIfNull(schedule, nameof(schedule));

var job = new Autogenerated.Job { Name = jobName, Schedule = schedule.ExpressionValue };
var job = new Autogenerated.Job { Name = jobName };

if (startingFrom is not null)
//Set up the schedule (recurring or point in time)
if (schedule.IsPointInTimeExpression)
{
job.DueTime = schedule.ExpressionValue;
}
else if (schedule.IsCronExpression || schedule.IsDurationExpression || schedule.IsPrefixedPeriodExpression)
{
job.Schedule = schedule.ExpressionValue;
}
else if (startingFrom is not null)
{
job.DueTime = ((DateTimeOffset)startingFrom).ToString("O");
}

if (repeats is not null)
{
if (repeats < 0)
Expand Down
35 changes: 15 additions & 20 deletions src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Text.Json;
using Dapr.Jobs.Models.Responses;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Routing;

Expand All @@ -29,8 +27,8 @@ public static class EndpointRouteBuilderExtensions
/// </summary>
/// <param name="endpoints">The <see cref="IEndpointRouteBuilder"/> to add the route to.</param>
/// <param name="action">The asynchronous action provided by the developer that handles any inbound requests. The first two
/// parameters must be a nullable <see cref="string"/> for the jobName and a nullable <see cref="DaprJobDetails"/> with the
/// payload details, but otherwise can be populated with additional services to be injected into the delegate.</param>
/// parameters must be a <see cref="string"/> for the jobName and the originally registered ReadOnlyMemory&lt;byte&gt; with the
/// payload value, but otherwise can be populated with additional services to be injected into the delegate.</param>
/// <param name="cancellationToken">Cancellation token that will be passed in as the last parameter to the delegate action.</param>
public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRouteBuilder endpoints,
Delegate action, CancellationToken cancellationToken = default)
Expand All @@ -40,29 +38,26 @@ public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRou

endpoints.MapPost("/job/{jobName}", async context =>
{
var jobName = (string?)context.Request.RouteValues["jobName"];
DaprJobDetails? jobPayload = null;
//Retrieve the name of the job from the request path
var jobName = string.Empty;
if (context.Request.RouteValues.TryGetValue("jobName", out var capturedJobName))
{
jobName = (string)capturedJobName!;
}

//Retrieve the job payload from the request body
ReadOnlyMemory<byte> payload = new();
if (context.Request.ContentLength is > 0)
{
using var reader = new StreamReader(context.Request.Body);
var body = await reader.ReadToEndAsync();

try
{
var deserializedJobPayload = JsonSerializer.Deserialize<DeserializableDaprJobDetails>(body);
jobPayload = deserializedJobPayload?.ToType() ?? null;
}
catch (JsonException)
{
jobPayload = null;
}
using var memoryStream = new MemoryStream();
await context.Request.Body.CopyToAsync(memoryStream, cancellationToken);
payload = memoryStream.ToArray();
}

var parameters = new Dictionary<Type, object?>
var parameters = new Dictionary<Type, object>
{
{ typeof(string), jobName },
{ typeof(DaprJobDetails), jobPayload },
{ typeof(ReadOnlyMemory<byte>), payload },
{ typeof(CancellationToken), CancellationToken.None }
};

Expand Down
52 changes: 0 additions & 52 deletions src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Text.Json.Serialization;
using Dapr.Jobs.JsonConverters;

namespace Dapr.Jobs.Models.Responses;

/// <summary>
Expand Down Expand Up @@ -46,52 +43,3 @@ public sealed record DaprJobDetails(DaprJobSchedule Schedule)
/// </summary>
public byte[]? Payload { get; init; } = null;
}

/// <summary>
/// A deserializable version of the <see cref="DaprJobDetails"/>.
/// </summary>
internal sealed record DeserializableDaprJobDetails
{
/// <summary>
/// Represents the schedule that triggers the job.
/// </summary>
public string? Schedule { get; init; }

/// <summary>
/// Allows for jobs with fixed repeat counts.
/// </summary>
public int? RepeatCount { get; init; } = null;

/// <summary>
/// Identifies a point-in-time representing when the job schedule should start from,
/// or as a "one-shot" time if other scheduling fields are not provided.
/// </summary>
[JsonConverter(typeof(Iso8601DateTimeJsonConverter))]
public DateTimeOffset? DueTime { get; init; } = null;

/// <summary>
/// A point-in-time value representing with the job should expire.
/// </summary>
/// <remarks>
/// This must be greater than <see cref="DueTime"/> if both are set.
/// </remarks>
[JsonConverter(typeof(Iso8601DateTimeJsonConverter))]
public DateTimeOffset? Ttl { get; init; } = null;

/// <summary>
/// Stores the main payload of the job which is passed to the trigger function.
/// </summary>
public byte[]? Payload { get; init; } = null;

public DaprJobDetails ToType()
{
var schedule = DaprJobSchedule.FromExpression(Schedule ?? string.Empty);
return new DaprJobDetails(schedule)
{
DueTime = DueTime,
Payload = Payload,
RepeatCount = RepeatCount,
Ttl = Ttl
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,7 @@ public async Task MapDaprScheduledJobHandler_ValidRequest_ExecutesAction()
var client = server.CreateClient();

var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789));
var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload);
var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *"))
{
RepeatCount = 5,
DueTime = DateTimeOffset.UtcNow,
Ttl = DateTimeOffset.UtcNow.AddHours(1),
Payload = serializedPayloadBytes
};
var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json");
var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json");

const string jobName = "testJob";
var response = await client.PostAsync($"/job/{jobName}", content);
Expand All @@ -68,15 +60,7 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken()
var client = server.CreateClient();

var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789));
var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload);
var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *"))
{
RepeatCount = 5,
DueTime = DateTimeOffset.UtcNow,
Ttl = DateTimeOffset.UtcNow.AddHours(1),
Payload = serializedPayloadBytes
};
var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json");
var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json");

const string jobName = "testJob";
var response = await client.PostAsync($"/job/{jobName}", content);
Expand All @@ -89,31 +73,11 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken()
Assert.Equal(serializedPayload, validator.SerializedPayload);
}


[Fact]
public async Task MapDaprScheduledJobHandler_InvalidPayload()
{
// Arrange
var server = CreateTestServer();
var client = server.CreateClient();

var content = new StringContent("", Encoding.UTF8, "application/json");

// Act
const string jobName = "testJob";
var response = await client.PostAsync($"/job/{jobName}", content);

var validator = server.Services.GetRequiredService<Validator>();
Assert.Equal(jobName, validator.JobName);
Assert.Null(validator.SerializedPayload);
}

private sealed record SamplePayload(string Name, int Count);

public sealed class Validator
{
public string? JobName { get; set; }

public string? SerializedPayload { get; set; }
}

Expand All @@ -130,15 +94,10 @@ private static TestServer CreateTestServer()
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapDaprScheduledJobHandler(async (string? jobName, DaprJobDetails? jobDetails, Validator validator, CancellationToken cancellationToken) =>
endpoints.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory<byte> jobPayload, Validator validator, CancellationToken cancellationToken) =>
{
if (jobName is not null)
validator.JobName = jobName;
if (jobDetails?.Payload is not null)
{
var payloadString = Encoding.UTF8.GetString(jobDetails.Payload);
validator.SerializedPayload = payloadString;
}
validator.JobName = jobName;
validator.SerializedPayload = Encoding.UTF8.GetString(jobPayload.Span);
await Task.CompletedTask;
});
});
Expand All @@ -160,15 +119,12 @@ private static TestServer CreateTestServer2()
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapDaprScheduledJobHandler(async (string? jobName, Validator validator, DaprJobDetails? jobDetails) =>
endpoints.MapDaprScheduledJobHandler(async (string jobName, Validator validator, ReadOnlyMemory<byte> payload) =>
{
if (jobName is not null)
validator.JobName = jobName;
if (jobDetails?.Payload is not null)
{
var payloadString = Encoding.UTF8.GetString(jobDetails.Payload);
validator.SerializedPayload = payloadString;
}
validator.JobName = jobName;

var payloadString = Encoding.UTF8.GetString(payload.Span);
validator.SerializedPayload = payloadString;
await Task.CompletedTask;
});
});
Expand Down
Loading