Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow unit testing changes for 1.10 release #1038

Merged
merged 3 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/Dapr.Workflow/DaprWorkflowContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Workflow
{
using System;
using Microsoft.DurableTask;
using System.Threading.Tasks;
using System.Threading;

class DaprWorkflowContext : WorkflowContext
{
readonly TaskOrchestrationContext innerContext;

internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
{
this.innerContext = innerContext ?? throw new ArgumentNullException(nameof(innerContext));
}

public override string Name => this.innerContext.Name;

public override string InstanceId => this.innerContext.InstanceId;

public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

public override bool IsReplaying => this.innerContext.IsReplaying;

public override Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync(name, input, options);
}

public override Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync<T>(name, input, options);
}

public override Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
{
return this.innerContext.CreateTimer(delay, cancellationToken);
}

public override Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
{
return this.innerContext.CreateTimer(fireAt, cancellationToken);
}

public override Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, cancellationToken);
}

public override Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, timeout);
}

public override void SendEvent(string instanceId, string eventName, object payload)
{
this.innerContext.SendEvent(instanceId, eventName, payload);
}

public override void SetCustomStatus(object? customStatus)
{
this.innerContext.SetCustomStatus(customStatus);
}

public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options);
}

public override Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options);
}

public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
{
this.innerContext.ContinueAsNew(newInput!, preserveUnprocessedEvents);
}

public override Guid NewGuid()
{
return this.innerContext.NewGuid();
}
}
}
74 changes: 20 additions & 54 deletions src/Dapr.Workflow/WorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,17 @@ namespace Dapr.Workflow
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
/// external events, and for getting basic information about the current workflow instance.
/// </summary>
public class WorkflowContext
public abstract class WorkflowContext
{
readonly TaskOrchestrationContext innerContext;

internal WorkflowContext(TaskOrchestrationContext innerContext)
{
this.innerContext = innerContext ?? throw new ArgumentNullException(nameof(innerContext));
}

/// <summary>
/// Gets the name of the current workflow.
/// </summary>
public string Name => this.innerContext.Name;
public abstract string Name { get; }

/// <summary>
/// Gets the instance ID of the current workflow.
/// </summary>
public string InstanceId => this.innerContext.InstanceId;
public abstract string InstanceId { get; }

/// <summary>
/// Gets the current workflow time in UTC.
Expand All @@ -51,7 +44,7 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// the current time, such as <see cref="DateTime.UtcNow"/> and <see cref="DateTimeOffset.UtcNow"/>
/// (which should not be used).
/// </remarks>
public DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;
public abstract DateTime CurrentUtcDateTime { get; }

/// <summary>
/// Gets a value indicating whether the workflow is currently replaying a previous execution.
Expand All @@ -72,7 +65,7 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// <value>
/// <c>true</c> if the workflow is currently replaying a previous execution; otherwise <c>false</c>.
/// </value>
public bool IsReplaying => this.innerContext.IsReplaying;
public abstract bool IsReplaying { get; }

/// <summary>
/// Asynchronously invokes an activity by name and with the specified input value.
Expand Down Expand Up @@ -108,19 +101,16 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// The activity failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="TaskFailedException.FailureDetails"/> property.
/// </exception>
public Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
public virtual Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync(name, input, options);
return this.CallActivityAsync<object>(name, input, options);
}

/// <returns>
/// A task that completes when the activity completes or fails. The result of the task is the activity's return value.
/// </returns>
/// <inheritdoc cref="CallActivityAsync"/>
public Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync<T>(name, input, options);
}
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null);

/// <summary>
/// Creates a durable timer that expires after the specified delay.
Expand All @@ -131,9 +121,9 @@ public Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptio
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
{
return this.innerContext.CreateTimer(delay, cancellationToken);
return this.CreateTimer(this.CurrentUtcDateTime.Add(delay), cancellationToken);
}

/// <summary>
Expand All @@ -142,10 +132,7 @@ public Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = de
/// <param name="fireAt">The time at which the timer should expire.</param>
/// <param name="cancellationToken">Used to cancel the durable timer.</param>
/// <inheritdoc cref="CreateTimer(TimeSpan, CancellationToken)"/>
public Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
{
return this.innerContext.CreateTimer(fireAt, cancellationToken);
}
public abstract Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken);

/// <summary>
/// Waits for an event to be raised with name <paramref name="eventName"/> and returns the event data.
Expand Down Expand Up @@ -176,10 +163,7 @@ public Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, cancellationToken);
}
public abstract Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default);

/// <summary>
/// Waits for an event to be raised with name <paramref name="eventName"/> and returns the event data.
Expand All @@ -190,10 +174,7 @@ public Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <inheritdoc cref="WaitForExternalEventAsync{T}(string, CancellationToken)"/>
public Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, timeout);
}
public abstract Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout);

/// <summary>
/// Raises an external event for the specified workflow instance.
Expand All @@ -208,10 +189,7 @@ public Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
/// <param name="instanceId">The ID of the workflow instance to send the event to.</param>
/// <param name="eventName">The name of the event to wait for. Event names are case-insensitive.</param>
/// <param name="payload">The serializable payload of the external event.</param>
public void SendEvent(string instanceId, string eventName, object payload)
{
this.SendEvent(instanceId, eventName, payload);
}
public abstract void SendEvent(string instanceId, string eventName, object payload);

/// <summary>
/// Assigns a custom status value to the current workflow.
Expand All @@ -226,10 +204,7 @@ public void SendEvent(string instanceId, string eventName, object payload)
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public void SetCustomStatus(object? customStatus)
{
this.innerContext.SetCustomStatus(customStatus);
}
public abstract void SetCustomStatus(object? customStatus);

/// <summary>
/// Executes the specified workflow as a child workflow and returns the result.
Expand All @@ -238,10 +213,7 @@ public void SetCustomStatus(object? customStatus)
/// The type into which to deserialize the child workflow's output.
/// </typeparam>
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, TaskOptions?)"/>
public Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options);
}
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null);

/// <summary>
/// Executes the specified workflow as a child workflow.
Expand Down Expand Up @@ -284,9 +256,9 @@ public Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object
/// The child workflow failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="TaskFailedException.FailureDetails"/> property.
/// </exception>
public Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
public virtual Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options);
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
}

/// <summary>
Expand Down Expand Up @@ -320,10 +292,7 @@ public Task CallChildWorkflowAsync(string workflowName, object? input = null, Ta
/// history when the workflow instance restarts. If <c>false</c>, any unprocessed
/// external events will be discarded when the workflow instance restarts.
/// </param>
public void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
{
this.innerContext.ContinueAsNew(newInput!, preserveUnprocessedEvents);
}
public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true);

/// <summary>
/// Creates a new GUID that is safe for replay within a workflow.
Expand All @@ -334,9 +303,6 @@ public void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvent
/// and an internally managed sequence number.
/// </remarks>
/// <returns>The new <see cref="Guid"/> value.</returns>
public Guid NewGuid()
{
return this.innerContext.NewGuid();
}
public abstract Guid NewGuid();
}
}
5 changes: 2 additions & 3 deletions src/Dapr.Workflow/WorkflowRuntimeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace Dapr.Workflow
using System.Threading.Tasks;
using Microsoft.DurableTask;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

/// <summary>
/// Defines runtime options for workflows.
Expand Down Expand Up @@ -52,7 +51,7 @@ public void RegisterWorkflow<TInput, TOutput>(string name, Func<WorkflowContext,
{
registry.AddOrchestratorFunc<TInput, TOutput>(name, (innerContext, input) =>
{
WorkflowContext workflowContext = new(innerContext);
WorkflowContext workflowContext = new DaprWorkflowContext(innerContext);
return implementation(workflowContext, input);
});
});
Expand Down Expand Up @@ -145,7 +144,7 @@ public OrchestratorWrapper(IWorkflow workflow)

public Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
return this.workflow.RunAsync(new WorkflowContext(context), input);
return this.workflow.RunAsync(new DaprWorkflowContext(context), input);
}
}

Expand Down