Skip to content

Add New Property Properties to TaskOrchestrationContext #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 2, 2025
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## (Unreleased)

- Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in ([#412](https://github.com/microsoft/durabletask-dotnet/pull/412))
- Add New Property Properties to TaskOrchestrationContext by @nytian in [#415](https://github.com/microsoft/durabletask-dotnet/pull/415)
- Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in [#412](https://github.com/microsoft/durabletask-dotnet/pull/412))
- Add specific logging for NotFound error on worker connection by @halspang in ([#413](https://github.com/microsoft/durabletask-dotnet/pull/413))
- Add user agent header to gRPC called in ([#417](https://github.com/microsoft/durabletask-dotnet/pull/417))
- Enrich User-Agent Header in gRPC Metadata to indicate Client or Worker as caller ([#421](https://github.com/microsoft/durabletask-dotnet/pull/421))
Expand Down
5 changes: 5 additions & 0 deletions src/Abstractions/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public abstract class TaskOrchestrationContext
/// </summary>
public virtual string Version => string.Empty;

/// <summary>
/// Gets the configuration settings for the orchestration context.
/// </summary>
public virtual IReadOnlyDictionary<string, object?> Properties { get; } = new Dictionary<string, object?>();

/// <summary>
/// Gets the entity feature, for interacting with entities.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";

message OrchestrationInstance {
string instanceId = 1;
Expand Down Expand Up @@ -318,6 +319,7 @@ message OrchestratorRequest {
repeated HistoryEvent newEvents = 4;
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map<string, google.protobuf.Value> properties = 7;
}

message OrchestratorResponse {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto
2 changes: 1 addition & 1 deletion src/ScheduledTasks/Client/ScheduledTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Microsoft.DurableTask.ScheduledTasks;

/// <summary>
/// Client for managing scheduled tasks.
/// Provides methods to retrieve a ScheduleClient, list all schedules,
/// Provides methods to retrieve a ScheduleClient, list all schedules,
/// and get details of a specific schedule.
/// </summary>
public abstract class ScheduledTaskClient
Expand Down
2 changes: 1 addition & 1 deletion src/ScheduledTasks/Models/ScheduleState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ public void RefreshScheduleRunExecutionToken()
{
this.ExecutionToken = Guid.NewGuid().ToString("N");
}
}
}
1 change: 0 additions & 1 deletion src/Shared/AzureManaged/GrpcRetryPolicyDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ sealed class GrpcRetryPolicyDefaults
/// </remarks>
public static readonly ServiceConfig DefaultServiceConfig;


/// <summary>
/// The default retry policy for gRPC operations.
/// </summary>
Expand Down
31 changes: 31 additions & 0 deletions src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,37 @@ internal static T Base64Decode<T>(this MessageParser parser, string encodedMessa
failureDetails.IsNonRetriable);
}

/// <summary>
/// Converts a <see cref="Google.Protobuf.WellKnownTypes.Value"/> instance to a corresponding C# object.
/// </summary>
/// <param name="value">The Protobuf Value to convert.</param>
/// <returns>The corresponding C# object.</returns>
/// <exception cref="NotSupportedException">
/// Thrown when the Protobuf Value.KindCase is not one of the supported types.
/// </exception>
internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value)
{
switch (value.KindCase)
{
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue:
return null;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue:
return value.NumberValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue:
return value.StringValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue:
return value.BoolValue;
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue:
return value.StructValue.Fields.ToDictionary(
pair => pair.Key,
pair => ConvertValueToObject(pair.Value));
case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue:
return value.ListValue.Values.Select(ConvertValueToObject).ToList();
default:
throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}");
}
}

/// <summary>
/// Converts a <see cref="FailureDetails" /> to a grpc <see cref="P.TaskFailureDetails" />.
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions src/Worker/Core/Shims/DurableTaskShimFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,29 @@ public TaskOrchestration CreateOrchestration(
return new TaskOrchestrationShim(context, orchestrator);
}

/// <summary>
/// Creates a <see cref="TaskOrchestration" /> from a <see cref="ITaskOrchestrator" />.
/// </summary>
/// <param name="name">
/// The name of the orchestration. This should be the name the orchestration was invoked with.
/// </param>
/// <param name="orchestrator">The orchestration to wrap.</param>
/// <param name ="properties">Configuration for the orchestration.</param>
/// <param name="parent">The orchestration parent details or <c>null</c> if no parent.</param>
/// <returns>A new <see cref="TaskOrchestration" />.</returns>
public TaskOrchestration CreateOrchestration(
TaskName name,
ITaskOrchestrator orchestrator,
IReadOnlyDictionary<string, object?> properties,
ParentOrchestrationInstance? parent = null)
{
Check.NotDefault(name);
Check.NotNull(orchestrator);
Check.NotNull(properties);
OrchestrationInvocationContext context = new(name, this.options, this.loggerFactory, parent);
return new TaskOrchestrationShim(context, orchestrator, properties);
}

/// <summary>
/// Creates a <see cref="TaskOrchestration" /> from a <see cref="ITaskOrchestrator" />.
/// </summary>
Expand Down
23 changes: 23 additions & 0 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,27 @@ public TaskOrchestrationContextWrapper(
OrchestrationContext innerContext,
OrchestrationInvocationContext invocationContext,
object? deserializedInput)
: this(innerContext, invocationContext, deserializedInput, new Dictionary<string, object?>())
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskOrchestrationContextWrapper"/> class.
/// </summary>
/// <param name="innerContext">The inner orchestration context.</param>
/// <param name="invocationContext">The invocation context.</param>
/// <param name="deserializedInput">The deserialized input.</param>
/// <param name="properties">The configuration for context.</param>
public TaskOrchestrationContextWrapper(
OrchestrationContext innerContext,
OrchestrationInvocationContext invocationContext,
object? deserializedInput,
IReadOnlyDictionary<string, object?> properties)
{
this.innerContext = Check.NotNull(innerContext);
this.invocationContext = Check.NotNull(invocationContext);
this.Properties = Check.NotNull(properties);

this.logger = this.CreateReplaySafeLogger("Microsoft.DurableTask");
this.deserializedInput = deserializedInput;
}
Expand All @@ -60,6 +78,11 @@ public TaskOrchestrationContextWrapper(
/// <inheritdoc/>
public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

/// <summary>
/// Gets the configuration settings for the orchestration.
/// </summary>
public override IReadOnlyDictionary<string, object?> Properties { get; }

/// <inheritdoc/>
public override TaskOrchestrationEntityFeature Entities
{
Expand Down
18 changes: 17 additions & 1 deletion src/Worker/Core/Shims/TaskOrchestrationShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ partial class TaskOrchestrationShim : TaskOrchestration
readonly ITaskOrchestrator implementation;
readonly OrchestrationInvocationContext invocationContext;
readonly ILogger logger;
readonly IReadOnlyDictionary<string, object?> properties;

TaskOrchestrationContextWrapper? wrapperContext;

Expand All @@ -30,9 +31,24 @@ partial class TaskOrchestrationShim : TaskOrchestration
public TaskOrchestrationShim(
OrchestrationInvocationContext invocationContext,
ITaskOrchestrator implementation)
: this(invocationContext, implementation, new Dictionary<string, object?>())
{
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskOrchestrationShim"/> class.
/// </summary>
/// <param name="invocationContext">The invocation context for this orchestration.</param>
/// <param name="implementation">The orchestration's implementation.</param>
/// <param name="properties">Configuration for the orchestration.</param>
public TaskOrchestrationShim(
OrchestrationInvocationContext invocationContext,
ITaskOrchestrator implementation,
IReadOnlyDictionary<string, object?> properties)
{
this.invocationContext = Check.NotNull(invocationContext);
this.implementation = Check.NotNull(implementation);
this.properties = Check.NotNull(properties);

this.logger = Logs.CreateWorkerLogger(this.invocationContext.LoggerFactory, "Orchestrations");
}
Expand All @@ -48,7 +64,7 @@ public TaskOrchestrationShim(
innerContext.ErrorDataConverter = converterShim;

object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
this.wrapperContext = new(innerContext, this.invocationContext, input);
this.wrapperContext = new(innerContext, this.invocationContext, input, this.properties);

string instanceId = innerContext.OrchestrationInstance.InstanceId;
if (!innerContext.IsReplaying)
Expand Down
5 changes: 4 additions & 1 deletion src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public static string LoadAndRun(

List<HistoryEvent> pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
IEnumerable<HistoryEvent> newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);
Dictionary<string, object?> properties = request.Properties.ToDictionary(
pair => pair.Key,
pair => ProtoUtils.ConvertValueToObject(pair.Value));

// Re-construct the orchestration state from the history.
// New events must be added using the AddEvent method.
Expand All @@ -108,7 +111,7 @@ public static string LoadAndRun(
DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
OrchestratorExecutionResult result = executor.Execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public TestTaskOrchestrationContext(string version)

protected override ILoggerFactory LoggerFactory => throw new NotImplementedException();

public override Dictionary<string, object?> Properties => throw new NotImplementedException();

public override Task<TResult> CallActivityAsync<TResult>(TaskName name, object? input = null, TaskOptions? options = null)
{
throw new NotImplementedException();
Expand Down
Loading