From cc7b542569f408e24a22ed2954c72d714bb2771d Mon Sep 17 00:00:00 2001 From: Jerome Peronne Date: Thu, 26 Jan 2023 15:58:17 +0100 Subject: [PATCH] runnable pollers splitting --- src/WorkflowCore.Testing/WorkflowTest.cs | 2 +- src/WorkflowCore/Models/WorkflowOptions.cs | 22 +- .../ServiceCollectionExtensions.cs | 5 +- .../BackgroundTasks/RunnablePoller.cs | 221 ------------------ .../BaseWorkflowRunnablePoller.cs | 34 +++ .../RunnablePoller/CommandRunnablePoller.cs | 77 ++++++ .../RunnablePoller/EventRunnablePoller.cs | 98 ++++++++ .../RunnablePoller/WorkflowRunnablePoller.cs | 97 ++++++++ .../BackgroundTasks/WorkflowConsumer.cs | 4 +- .../Scenarios/DelayScenario.cs | 2 +- .../Scenarios/ParallelEventsScenario.cs | 2 +- .../Scenarios/MongoDelayScenario.cs | 2 +- .../Scenarios/MysqlDelayScenario.cs | 2 +- .../Scenarios/PostgresDelayScenario.cs | 2 +- .../Scenarios/SqlServerDelayScenario.cs | 2 +- 15 files changed, 338 insertions(+), 234 deletions(-) delete mode 100644 src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs create mode 100644 src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/BaseWorkflowRunnablePoller.cs create mode 100644 src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/CommandRunnablePoller.cs create mode 100644 src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/EventRunnablePoller.cs create mode 100644 src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/WorkflowRunnablePoller.cs diff --git a/src/WorkflowCore.Testing/WorkflowTest.cs b/src/WorkflowCore.Testing/WorkflowTest.cs index bf0eb97ab..da98987c5 100644 --- a/src/WorkflowCore.Testing/WorkflowTest.cs +++ b/src/WorkflowCore.Testing/WorkflowTest.cs @@ -46,7 +46,7 @@ protected void Host_OnStepError(WorkflowInstance workflow, WorkflowStep step, Ex protected virtual void ConfigureServices(IServiceCollection services) { - services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3))); + services.AddWorkflow(options => options.UsePollWorkflowsInterval(TimeSpan.FromSeconds(3))); } public string StartWorkflow(TData data) diff --git a/src/WorkflowCore/Models/WorkflowOptions.cs b/src/WorkflowCore/Models/WorkflowOptions.cs index 8913c32c2..2b254605f 100644 --- a/src/WorkflowCore/Models/WorkflowOptions.cs +++ b/src/WorkflowCore/Models/WorkflowOptions.cs @@ -13,7 +13,9 @@ public class WorkflowOptions internal Func LockFactory; internal Func EventHubFactory; internal Func SearchIndexFactory; - internal TimeSpan PollInterval; + internal TimeSpan PollWorkflowsInterval; + internal TimeSpan PollEventsInterval; + internal TimeSpan PollCommandsInterval; internal TimeSpan IdleTime; internal TimeSpan ErrorRetryInterval; internal int MaxConcurrentWorkflows = Math.Max(Environment.ProcessorCount, 4); @@ -23,7 +25,9 @@ public class WorkflowOptions public WorkflowOptions(IServiceCollection services) { Services = services; - PollInterval = TimeSpan.FromSeconds(10); + PollWorkflowsInterval = TimeSpan.FromSeconds(10); + PollEventsInterval = TimeSpan.FromSeconds(10); + PollCommandsInterval = TimeSpan.FromSeconds(10); IdleTime = TimeSpan.FromMilliseconds(100); ErrorRetryInterval = TimeSpan.FromSeconds(60); @@ -65,9 +69,19 @@ public void UseSearchIndex(Func factory) SearchIndexFactory = factory; } - public void UsePollInterval(TimeSpan interval) + public void UsePollWorkflowsInterval(TimeSpan interval) { - PollInterval = interval; + PollWorkflowsInterval = interval; + } + + public void UsePollEventsInterval(TimeSpan interval) + { + PollEventsInterval = interval; + } + + public void UsePollCommandsInterval(TimeSpan interval) + { + PollCommandsInterval = interval; } public void UseErrorRetryInterval(TimeSpan interval) diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index 760a89d41..aafacde28 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -7,6 +7,7 @@ using WorkflowCore.Primitives; using WorkflowCore.Services.BackgroundTasks; using WorkflowCore.Services.ErrorHandlers; +using WorkflowCore.Services.BackgroundTasks.RunnablePoller; namespace Microsoft.Extensions.DependencyInjection { @@ -50,7 +51,9 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A if (options.EnablePolling) { - services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); } services.AddTransient(sp => sp.GetService()); diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs deleted file mode 100644 index fcd2abd92..000000000 --- a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs +++ /dev/null @@ -1,221 +0,0 @@ -using System; -using System.Diagnostics; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using OpenTelemetry.Trace; -using WorkflowCore.Interface; -using WorkflowCore.Models; - -namespace WorkflowCore.Services.BackgroundTasks -{ - internal class RunnablePoller : IBackgroundTask - { - private readonly IPersistenceProvider _persistenceStore; - private readonly IDistributedLockProvider _lockProvider; - private readonly IQueueProvider _queueProvider; - private readonly ILogger _logger; - private readonly IGreyList _greylist; - private readonly WorkflowOptions _options; - private readonly IDateTimeProvider _dateTimeProvider; - private Timer _pollTimer; - - public RunnablePoller(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IGreyList greylist, IDateTimeProvider dateTimeProvider, WorkflowOptions options) - { - _persistenceStore = persistenceStore; - _greylist = greylist; - _queueProvider = queueProvider; - _logger = loggerFactory.CreateLogger(); - _lockProvider = lockProvider; - _dateTimeProvider = dateTimeProvider; - _options = options; - } - - public void Start() - { - _pollTimer = new Timer(new TimerCallback(PollRunnables), null, TimeSpan.FromSeconds(0), _options.PollInterval); - } - - public void Stop() - { - if (_pollTimer != null) - { - _pollTimer.Dispose(); - _pollTimer = null; - } - } - - /// - /// Poll the persistence store for workflows ready to run. - /// Poll the persistence store for stashed unpublished events - /// - private async void PollRunnables(object target) - { - await PollWorkflows(); - await PollEvents(); - await PollCommands(); - } - - private async Task PollWorkflows() - { - var activity = WorkflowActivity.StartPoll("workflows"); - try - { - if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken())) - { - try - { - _logger.LogDebug("Polling for runnable workflows"); - - var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now); - foreach (var item in runnables) - { - if (_persistenceStore.SupportsScheduledCommands) - { - try - { - await _persistenceStore.ScheduleCommand(new ScheduledCommand() - { - CommandName = ScheduledCommand.ProcessWorkflow, - Data = item, - ExecuteTime = _dateTimeProvider.UtcNow.Ticks - }); - continue; - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - activity?.RecordException(ex); - } - } - if (_greylist.Contains($"wf:{item}")) - { - _logger.LogDebug($"Got greylisted workflow {item}"); - continue; - } - _logger.LogDebug("Got runnable instance {0}", item); - _greylist.Add($"wf:{item}"); - await _queueProvider.QueueWork(item, QueueType.Workflow); - } - } - finally - { - await _lockProvider.ReleaseLock("poll runnables"); - } - } - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - activity?.RecordException(ex); - } - finally - { - activity?.Dispose(); - } - } - - private async Task PollEvents() - { - var activity = WorkflowActivity.StartPoll("events"); - try - { - if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken())) - { - try - { - _logger.LogDebug("Polling for unprocessed events"); - - var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now); - foreach (var item in events.ToList()) - { - if (_persistenceStore.SupportsScheduledCommands) - { - try - { - await _persistenceStore.ScheduleCommand(new ScheduledCommand() - { - CommandName = ScheduledCommand.ProcessEvent, - Data = item, - ExecuteTime = _dateTimeProvider.UtcNow.Ticks - }); - continue; - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - activity?.RecordException(ex); - } - } - if (_greylist.Contains($"evt:{item}")) - { - _logger.LogDebug($"Got greylisted event {item}"); - continue; - } - _logger.LogDebug($"Got unprocessed event {item}"); - _greylist.Add($"evt:{item}"); - await _queueProvider.QueueWork(item, QueueType.Event); - } - } - finally - { - await _lockProvider.ReleaseLock("unprocessed events"); - } - } - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - activity?.RecordException(ex); - } - finally - { - activity?.Dispose(); - } - } - - private async Task PollCommands() - { - var activity = WorkflowActivity.StartPoll("commands"); - try - { - if (!_persistenceStore.SupportsScheduledCommands) - return; - - if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken())) - { - try - { - _logger.LogDebug("Polling for scheduled commands"); - await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) => - { - switch (command.CommandName) - { - case ScheduledCommand.ProcessWorkflow: - await _queueProvider.QueueWork(command.Data, QueueType.Workflow); - break; - case ScheduledCommand.ProcessEvent: - await _queueProvider.QueueWork(command.Data, QueueType.Event); - break; - } - }); - } - finally - { - await _lockProvider.ReleaseLock("poll-commands"); - } - } - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - activity?.RecordException(ex); - } - finally - { - activity?.Dispose(); - } - } - } -} diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/BaseWorkflowRunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/BaseWorkflowRunnablePoller.cs new file mode 100644 index 000000000..e7fb07abc --- /dev/null +++ b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/BaseWorkflowRunnablePoller.cs @@ -0,0 +1,34 @@ +using System; +using System.Threading; +using WorkflowCore.Interface; + +namespace WorkflowCore.Services.BackgroundTasks.RunnablePoller +{ + internal abstract class BaseWorkflowRunnablePoller : IBackgroundTask + { + private readonly TimeSpan _pollInterval; + private Timer _pollTimer; + + protected BaseWorkflowRunnablePoller(TimeSpan pollInterval) + { + _pollInterval = pollInterval; + } + + public void Start() + { + _pollTimer = new Timer(new TimerCallback(PollRunnables), null, TimeSpan.FromSeconds(0), _pollInterval); + } + + public void Stop() + { + if (_pollTimer != null) + { + _pollTimer.Dispose(); + _pollTimer = null; + } + } + + + protected abstract void PollRunnables(object target); + } +} diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/CommandRunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/CommandRunnablePoller.cs new file mode 100644 index 000000000..6c109bf44 --- /dev/null +++ b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/CommandRunnablePoller.cs @@ -0,0 +1,77 @@ +using System; +using System.Threading; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Trace; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services.BackgroundTasks.RunnablePoller +{ + internal class CommandRunnablePoller : BaseWorkflowRunnablePoller + { + private readonly IPersistenceProvider _persistenceStore; + private readonly IDistributedLockProvider _lockProvider; + private readonly IQueueProvider _queueProvider; + private readonly ILogger _logger; + private readonly IDateTimeProvider _dateTimeProvider; + + public CommandRunnablePoller( + IPersistenceProvider persistenceStore, + IQueueProvider queueProvider, + ILoggerFactory loggerFactory, + IDistributedLockProvider lockProvider, + IDateTimeProvider dateTimeProvider, + WorkflowOptions options) + : base(options.PollCommandsInterval) + { + _persistenceStore = persistenceStore; + _queueProvider = queueProvider; + _logger = loggerFactory.CreateLogger(); + _lockProvider = lockProvider; + _dateTimeProvider = dateTimeProvider; + } + + protected override async void PollRunnables(object target) + { + var activity = WorkflowActivity.StartPoll("commands"); + try + { + if (!_persistenceStore.SupportsScheduledCommands) + return; + + if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken())) + { + try + { + _logger.LogDebug("Polling for scheduled commands"); + await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) => + { + switch (command.CommandName) + { + case ScheduledCommand.ProcessWorkflow: + await _queueProvider.QueueWork(command.Data, QueueType.Workflow); + break; + case ScheduledCommand.ProcessEvent: + await _queueProvider.QueueWork(command.Data, QueueType.Event); + break; + } + }); + } + finally + { + await _lockProvider.ReleaseLock("poll-commands"); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + activity?.RecordException(ex); + } + finally + { + activity?.Dispose(); + } + } + } +} diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/EventRunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/EventRunnablePoller.cs new file mode 100644 index 000000000..b4dbfbcd2 --- /dev/null +++ b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/EventRunnablePoller.cs @@ -0,0 +1,98 @@ +using System; +using System.Linq; +using System.Threading; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Trace; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services.BackgroundTasks.RunnablePoller +{ + internal class EventRunnablePoller : BaseWorkflowRunnablePoller + { + private readonly IPersistenceProvider _persistenceStore; + private readonly IDistributedLockProvider _lockProvider; + private readonly IQueueProvider _queueProvider; + private readonly ILogger _logger; + private readonly IGreyList _greylist; + private readonly IDateTimeProvider _dateTimeProvider; + + public EventRunnablePoller( + IPersistenceProvider persistenceStore, + IQueueProvider queueProvider, + ILoggerFactory loggerFactory, + IDistributedLockProvider lockProvider, + IGreyList greylist, + IDateTimeProvider dateTimeProvider, + WorkflowOptions options) + : base(options.PollEventsInterval) + { + _persistenceStore = persistenceStore; + _greylist = greylist; + _queueProvider = queueProvider; + _logger = loggerFactory.CreateLogger(); + _lockProvider = lockProvider; + _dateTimeProvider = dateTimeProvider; + } + + protected override async void PollRunnables(object target) + { + var activity = WorkflowActivity.StartPoll("events"); + try + { + if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken())) + { + try + { + _logger.LogDebug("Polling for unprocessed events"); + + var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now); + foreach (var item in events.ToList()) + { + if (_persistenceStore.SupportsScheduledCommands) + { + try + { + await _persistenceStore.ScheduleCommand(new ScheduledCommand() + { + CommandName = ScheduledCommand.ProcessEvent, + Data = item, + ExecuteTime = _dateTimeProvider.UtcNow.Ticks + }); + continue; + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + activity?.RecordException(ex); + } + } + if (_greylist.Contains($"evt:{item}")) + { + _logger.LogDebug($"Got greylisted event {item}"); + continue; + } + _logger.LogDebug($"Got unprocessed event {item}"); + _greylist.Add($"evt:{item}"); + await _queueProvider.QueueWork(item, QueueType.Event); + } + } + finally + { + await _lockProvider.ReleaseLock("unprocessed events"); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + activity?.RecordException(ex); + } + finally + { + activity?.Dispose(); + } + } + + } +} diff --git a/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/WorkflowRunnablePoller.cs b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/WorkflowRunnablePoller.cs new file mode 100644 index 000000000..edd4c64ab --- /dev/null +++ b/src/WorkflowCore/Services/BackgroundTasks/RunnablePoller/WorkflowRunnablePoller.cs @@ -0,0 +1,97 @@ +using System; +using System.Threading; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Trace; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.Services.BackgroundTasks.RunnablePoller +{ + internal class WorkflowRunnablePoller : BaseWorkflowRunnablePoller + { + private readonly IPersistenceProvider _persistenceStore; + private readonly IDistributedLockProvider _lockProvider; + private readonly IQueueProvider _queueProvider; + private readonly ILogger _logger; + private readonly IGreyList _greylist; + private readonly IDateTimeProvider _dateTimeProvider; + + public WorkflowRunnablePoller( + IPersistenceProvider persistenceStore, + IQueueProvider queueProvider, + ILoggerFactory loggerFactory, + IDistributedLockProvider lockProvider, + IGreyList greylist, + IDateTimeProvider dateTimeProvider, + WorkflowOptions options) + : base(options.PollWorkflowsInterval) + { + _persistenceStore = persistenceStore; + _greylist = greylist; + _queueProvider = queueProvider; + _logger = loggerFactory.CreateLogger(); + _lockProvider = lockProvider; + _dateTimeProvider = dateTimeProvider; + } + + protected override async void PollRunnables(object target) + { + var activity = WorkflowActivity.StartPoll("workflows"); + try + { + if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken())) + { + try + { + _logger.LogDebug("Polling for runnable workflows"); + + var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now); + foreach (var item in runnables) + { + if (_persistenceStore.SupportsScheduledCommands) + { + try + { + await _persistenceStore.ScheduleCommand(new ScheduledCommand() + { + CommandName = ScheduledCommand.ProcessWorkflow, + Data = item, + ExecuteTime = _dateTimeProvider.UtcNow.Ticks + }); + continue; + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + activity?.RecordException(ex); + } + } + if (_greylist.Contains($"wf:{item}")) + { + _logger.LogDebug($"Got greylisted workflow {item}"); + continue; + } + _logger.LogDebug("Got runnable instance {0}", item); + _greylist.Add($"wf:{item}"); + await _queueProvider.QueueWork(item, QueueType.Workflow); + } + } + finally + { + await _lockProvider.ReleaseLock("poll runnables"); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + activity?.RecordException(ex); + } + finally + { + activity?.Dispose(); + } + } + + } +} diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index b092f41f6..7f75ee127 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -75,7 +75,9 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue) { - var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks; + var timeSpanBeforeNextPoll = _persistenceStore.SupportsScheduledCommands ? + Options.PollCommandsInterval : Options.PollWorkflowsInterval; + var readAheadTicks = _datetimeProvider.UtcNow.Add(timeSpanBeforeNextPoll).Ticks; if (workflow.NextExecution.Value < readAheadTicks) { new Task(() => FutureQueue(workflow, cancellationToken)).Start(); diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs index 704b27d79..5fc75a205 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/DelayScenario.cs @@ -44,7 +44,7 @@ public void Scenario() var workflowId = StartWorkflow(new DelayWorkflow.MyDataClass() { - WaitTime = Host.Options.PollInterval.Add(TimeSpan.FromSeconds(1)) + WaitTime = Host.Options.PollWorkflowsInterval.Add(TimeSpan.FromSeconds(1)) }); WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ParallelEventsScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelEventsScenario.cs index 2c3b6825f..47cb622e5 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/ParallelEventsScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ParallelEventsScenario.cs @@ -71,7 +71,7 @@ public ParallelEventsScenario() protected override void ConfigureServices(IServiceCollection services) { - services.AddWorkflow(s => s.UsePollInterval(TimeSpan.FromSeconds(1))); + services.AddWorkflow(s => s.UsePollEventsInterval(TimeSpan.FromSeconds(1))); } [Fact] diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs index 45aca549f..a3ef0671f 100644 --- a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoDelayScenario.cs @@ -19,7 +19,7 @@ protected override void ConfigureServices(IServiceCollection services) services.AddWorkflow(cfg => { cfg.UseMongoDB(MongoDockerSetup.ConnectionString, nameof(MongoDelayScenario)); - cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + cfg.UsePollWorkflowsInterval(TimeSpan.FromSeconds(2)); }); } } diff --git a/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs index deed80f2b..01dedf6ac 100644 --- a/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs +++ b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlDelayScenario.cs @@ -13,7 +13,7 @@ protected override void ConfigureServices(IServiceCollection services) services.AddWorkflow(cfg => { cfg.UseMySQL(MysqlDockerSetup.ScenarioConnectionString, true, true); - cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + cfg.UsePollWorkflowsInterval(TimeSpan.FromSeconds(2)); }); } } diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs index 649632ac5..904bed06a 100644 --- a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresDelayScenario.cs @@ -13,7 +13,7 @@ protected override void ConfigureServices(IServiceCollection services) services.AddWorkflow(cfg => { cfg.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true); - cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + cfg.UsePollWorkflowsInterval(TimeSpan.FromSeconds(2)); }); } } diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs index 5e623c1db..b8de4fc5f 100644 --- a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerDelayScenario.cs @@ -13,7 +13,7 @@ protected override void ConfigureServices(IServiceCollection services) services.AddWorkflow(cfg => { cfg.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true); - cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + cfg.UsePollWorkflowsInterval(TimeSpan.FromSeconds(2)); }); } }