diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc71ef1..7f755155 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ## (Unreleased) -- Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in ([#412](https://github.com/microsoft/durabletask-dotnet/pull/412)) +- Add New Property Properties to TaskOrchestrationContext by @nytian in [#415](https://github.com/microsoft/durabletask-dotnet/pull/415) +- Add automatic retry on gateway timeout in `GrpcDurableTaskClient.WaitForInstanceCompletionAsync` in [#412](https://github.com/microsoft/durabletask-dotnet/pull/412)) - Add specific logging for NotFound error on worker connection by @halspang in ([#413](https://github.com/microsoft/durabletask-dotnet/pull/413)) - Add user agent header to gRPC called in ([#417](https://github.com/microsoft/durabletask-dotnet/pull/417)) - Enrich User-Agent Header in gRPC Metadata to indicate Client or Worker as caller ([#421](https://github.com/microsoft/durabletask-dotnet/pull/421)) diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index 45199d80..95cc116e 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -65,6 +65,11 @@ public abstract class TaskOrchestrationContext /// public virtual string Version => string.Empty; + /// + /// Gets the configuration settings for the orchestration context. + /// + public virtual IReadOnlyDictionary Properties { get; } = new Dictionary(); + /// /// Gets the entity feature, for interacting with entities. /// diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 5dd39cfe..88928c3b 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -11,6 +11,7 @@ import "google/protobuf/timestamp.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/wrappers.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/struct.proto"; message OrchestrationInstance { string instanceId = 1; @@ -318,6 +319,7 @@ message OrchestratorRequest { repeated HistoryEvent newEvents = 4; OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; + map properties = 7; } message OrchestratorResponse { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 733b20c0..fc90a440 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-03-24 23:37:31 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/c85ef11430ff8e10e21105abb545b0803bb86c66/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto diff --git a/src/ScheduledTasks/Client/ScheduledTaskClient.cs b/src/ScheduledTasks/Client/ScheduledTaskClient.cs index edc7658a..e45dd7ff 100644 --- a/src/ScheduledTasks/Client/ScheduledTaskClient.cs +++ b/src/ScheduledTasks/Client/ScheduledTaskClient.cs @@ -5,7 +5,7 @@ namespace Microsoft.DurableTask.ScheduledTasks; /// /// Client for managing scheduled tasks. -/// Provides methods to retrieve a ScheduleClient, list all schedules, +/// Provides methods to retrieve a ScheduleClient, list all schedules, /// and get details of a specific schedule. /// public abstract class ScheduledTaskClient diff --git a/src/ScheduledTasks/Models/ScheduleState.cs b/src/ScheduledTasks/Models/ScheduleState.cs index 48783de4..7007c762 100644 --- a/src/ScheduledTasks/Models/ScheduleState.cs +++ b/src/ScheduledTasks/Models/ScheduleState.cs @@ -50,4 +50,4 @@ public void RefreshScheduleRunExecutionToken() { this.ExecutionToken = Guid.NewGuid().ToString("N"); } -} \ No newline at end of file +} diff --git a/src/Shared/AzureManaged/GrpcRetryPolicyDefaults.cs b/src/Shared/AzureManaged/GrpcRetryPolicyDefaults.cs index 6217d186..c0cd147b 100644 --- a/src/Shared/AzureManaged/GrpcRetryPolicyDefaults.cs +++ b/src/Shared/AzureManaged/GrpcRetryPolicyDefaults.cs @@ -39,7 +39,6 @@ sealed class GrpcRetryPolicyDefaults /// public static readonly ServiceConfig DefaultServiceConfig; - /// /// The default retry policy for gRPC operations. /// diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 9a18c524..56907bc9 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -904,6 +904,37 @@ internal static T Base64Decode(this MessageParser parser, string encodedMessa failureDetails.IsNonRetriable); } + /// + /// Converts a instance to a corresponding C# object. + /// + /// The Protobuf Value to convert. + /// The corresponding C# object. + /// + /// Thrown when the Protobuf Value.KindCase is not one of the supported types. + /// + internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value) + { + switch (value.KindCase) + { + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue: + return null; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue: + return value.NumberValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue: + return value.StringValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue: + return value.BoolValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue: + return value.StructValue.Fields.ToDictionary( + pair => pair.Key, + pair => ConvertValueToObject(pair.Value)); + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue: + return value.ListValue.Values.Select(ConvertValueToObject).ToList(); + default: + throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}"); + } + } + /// /// Converts a to a grpc . /// diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs index d9be8a3c..584b7eeb 100644 --- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs +++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs @@ -89,6 +89,29 @@ public TaskOrchestration CreateOrchestration( return new TaskOrchestrationShim(context, orchestrator); } + /// + /// Creates a from a . + /// + /// + /// The name of the orchestration. This should be the name the orchestration was invoked with. + /// + /// The orchestration to wrap. + /// Configuration for the orchestration. + /// The orchestration parent details or null if no parent. + /// A new . + public TaskOrchestration CreateOrchestration( + TaskName name, + ITaskOrchestrator orchestrator, + IReadOnlyDictionary properties, + ParentOrchestrationInstance? parent = null) + { + Check.NotDefault(name); + Check.NotNull(orchestrator); + Check.NotNull(properties); + OrchestrationInvocationContext context = new(name, this.options, this.loggerFactory, parent); + return new TaskOrchestrationShim(context, orchestrator, properties); + } + /// /// Creates a from a . /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 8ce7d512..a964020d 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -38,9 +38,27 @@ public TaskOrchestrationContextWrapper( OrchestrationContext innerContext, OrchestrationInvocationContext invocationContext, object? deserializedInput) + : this(innerContext, invocationContext, deserializedInput, new Dictionary()) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The inner orchestration context. + /// The invocation context. + /// The deserialized input. + /// The configuration for context. + public TaskOrchestrationContextWrapper( + OrchestrationContext innerContext, + OrchestrationInvocationContext invocationContext, + object? deserializedInput, + IReadOnlyDictionary properties) { this.innerContext = Check.NotNull(innerContext); this.invocationContext = Check.NotNull(invocationContext); + this.Properties = Check.NotNull(properties); + this.logger = this.CreateReplaySafeLogger("Microsoft.DurableTask"); this.deserializedInput = deserializedInput; } @@ -60,6 +78,11 @@ public TaskOrchestrationContextWrapper( /// public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime; + /// + /// Gets the configuration settings for the orchestration. + /// + public override IReadOnlyDictionary Properties { get; } + /// public override TaskOrchestrationEntityFeature Entities { diff --git a/src/Worker/Core/Shims/TaskOrchestrationShim.cs b/src/Worker/Core/Shims/TaskOrchestrationShim.cs index a899dd22..127038a3 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationShim.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationShim.cs @@ -19,6 +19,7 @@ partial class TaskOrchestrationShim : TaskOrchestration readonly ITaskOrchestrator implementation; readonly OrchestrationInvocationContext invocationContext; readonly ILogger logger; + readonly IReadOnlyDictionary properties; TaskOrchestrationContextWrapper? wrapperContext; @@ -30,9 +31,24 @@ partial class TaskOrchestrationShim : TaskOrchestration public TaskOrchestrationShim( OrchestrationInvocationContext invocationContext, ITaskOrchestrator implementation) + : this(invocationContext, implementation, new Dictionary()) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The invocation context for this orchestration. + /// The orchestration's implementation. + /// Configuration for the orchestration. + public TaskOrchestrationShim( + OrchestrationInvocationContext invocationContext, + ITaskOrchestrator implementation, + IReadOnlyDictionary properties) { this.invocationContext = Check.NotNull(invocationContext); this.implementation = Check.NotNull(implementation); + this.properties = Check.NotNull(properties); this.logger = Logs.CreateWorkerLogger(this.invocationContext.LoggerFactory, "Orchestrations"); } @@ -48,7 +64,7 @@ public TaskOrchestrationShim( innerContext.ErrorDataConverter = converterShim; object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType); - this.wrapperContext = new(innerContext, this.invocationContext, input); + this.wrapperContext = new(innerContext, this.invocationContext, input, this.properties); string instanceId = innerContext.OrchestrationInstance.InstanceId; if (!innerContext.IsReplaying) diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index d1737bf7..9a6e3c5f 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -91,6 +91,9 @@ public static string LoadAndRun( List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + Dictionary properties = request.Properties.ToDictionary( + pair => pair.Key, + pair => ProtoUtils.ConvertValueToObject(pair.Value)); // Re-construct the orchestration state from the history. // New events must be added using the AddEvent method. @@ -108,7 +111,7 @@ public static string LoadAndRun( DurableTaskShimFactory factory = services is null ? DurableTaskShimFactory.Default : ActivatorUtilities.GetServiceOrCreateInstance(services); - TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent); + TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); OrchestratorExecutionResult result = executor.Execute(); diff --git a/test/Abstractions.Tests/TaskOrchestrationContextVersionTests.cs b/test/Abstractions.Tests/TaskOrchestrationContextVersionTests.cs index 00d59a6d..11247f3e 100644 --- a/test/Abstractions.Tests/TaskOrchestrationContextVersionTests.cs +++ b/test/Abstractions.Tests/TaskOrchestrationContextVersionTests.cs @@ -58,6 +58,8 @@ public TestTaskOrchestrationContext(string version) protected override ILoggerFactory LoggerFactory => throw new NotImplementedException(); + public override Dictionary Properties => throw new NotImplementedException(); + public override Task CallActivityAsync(TaskName name, object? input = null, TaskOptions? options = null) { throw new NotImplementedException();