Skip to content

Commit 52862a1

Browse files
committed
Bump version to v3.1.0: Enhance method waitForAllExecutingJobsToComplete with optional parameter to extend the waiting behavior to include potentially pending jobs and improve documentation
1 parent 99d8ac3 commit 52862a1

10 files changed

+280
-95
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ The `ZeroBackpressureSemaphore` class provides the following methods:
5858

5959
* __startExecution__: Resolves once the given job has **started** its execution. Users can leverage this to prevent backpressure of pending jobs; If the semaphore is too busy to start a given job `X`, there is no reason to create another job `Y` until `X` has started. This method is particularly useful for background job workers that frequently retrieve job metadata from external sources, such as pulling messages from a message broker.
6060
* __waitForCompletion__: Executes the given job in a controlled manner, once there is an available slot. It resolves or rejects when the job **completes** execution, returning the job's value or propagating any error it may throw.
61-
* __waitForAllExecutingJobsToComplete__: Resolves when all **currently** executing jobs have finished, meaning once all running promises have either resolved or rejected. This is particularly useful in scenarios where you need to ensure that all jobs are completed before proceeding, such as during shutdown processes or between unit tests.
61+
* __waitForAllExecutingJobsToComplete__: Resolves when all **currently** executing jobs have finished, meaning once all running promises have either resolved or rejected. This is particularly useful in scenarios where you need to ensure that all jobs are completed before proceeding, such as during shutdown processes or between unit tests. By using the optional `considerPendingJobsBackpressure` parameter, the waiting behavior can be extended to include potentially pending jobs that induce backpressure.
6262
* __waitForAvailability__: This method resolves once at least one slot is available for job execution. In other words, it resolves when the semaphore is available to trigger a new job immediately. Note that the same effect can be achieved with `startExecution` alone, if the async logic (intended to be delayed until availability) is handled **within the job itself** rather than as a preliminary step. Therefore, `waitForAvailability` serves as a design choice rather than a strict necessity.
6363
* __extractUncaughtErrors__: Returns an array of uncaught errors, captured by the semaphore while executing background jobs added by `startExecution`. The instance will no longer hold these error references once extracted. In other words, ownership of these uncaught errors shifts to the caller, while the semaphore clears its list of uncaught errors.
6464

dist/zero-backpressure-semaphore.d.ts

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@ export type SemaphoreJob<T> = () => Promise<T>;
2626
*
2727
* ### Graceful Termination
2828
* All the job execution promises are tracked by the semaphore instance, ensuring no dangling promises.
29-
* This enables graceful termination via the `waitForAllExecutingJobsToComplete` method, which is
30-
* particularly useful for the multiple jobs execution use-case. This can help perform necessary
31-
* post-processing logic, and ensure a clear state between unit-tests.
32-
* If your component has a termination method (`stop`, `terminate`, or similar), keep that in mind.
29+
* This enables graceful termination via the `waitForAllExecutingJobsToComplete` method, in scenarios
30+
* where it is essential to ensure that all the currently executing or pending jobs are fully processed
31+
* before proceeding.
32+
* Examples include application shutdowns (e.g., `onModuleDestroy` in Nest.js applications) or
33+
* maintaining a clear state between unit-tests.
34+
* Should graceful teardown be a concern for your component, consider how its termination method (e.g.,
35+
* `stop`, `terminate`, `onModuleDestroy`, etc) aligns with this capability.
3336
*
3437
* ### Error Handling for Background Jobs
3538
* Background jobs triggered by `startExecution` may throw errors. Unlike the `waitForCompletion` case,
@@ -38,20 +41,23 @@ export type SemaphoreJob<T> = () => Promise<T>;
3841
* the `extractUncaughtErrors` method. The number of accumulated uncaught errors can be obtained via
3942
* the `amountOfUncaughtErrors` getter method. This can be useful, for example, if the user wants to
4043
* handle uncaught errors only after a certain threshold is reached.
41-
*
42-
* ### Complexity
43-
* - **Initialization**: O(maxConcurrentJobs) for both time and space.
44-
* - **startExecution, waitForCompletion**: O(1) for both time and space, excluding the job execution itself.
45-
* - **waitForAllExecutingJobsToComplete**: O(maxConcurrentJobs) for both time and space, excluding job executions.
46-
* - All the getter methods have O(1) complexity for both time and space.
47-
*
4844
*/
4945
export declare class ZeroBackpressureSemaphore<T, UncaughtErrorType = Error> {
5046
private readonly _availableSlotsStack;
5147
private readonly _slots;
5248
private _waitForAvailableSlot?;
5349
private _notifyAvailableSlotExists?;
5450
private _uncaughtErrors;
51+
/**
52+
* Constructor.
53+
*
54+
* Initializes the semaphore with the specified maximum number of concurrently
55+
* executing jobs. This sets up the internal structures to enforce the concurrency
56+
* limit for job execution.
57+
*
58+
* @param maxConcurrentJobs The maximum number of jobs that can execute concurrently.
59+
* @throws Error if `maxConcurrentJobs` is not a natural number (i.e., a positive integer).
60+
*/
5561
constructor(maxConcurrentJobs: number);
5662
/**
5763
* maxConcurrentJobs
@@ -134,19 +140,34 @@ export declare class ZeroBackpressureSemaphore<T, UncaughtErrorType = Error> {
134140
/**
135141
* waitForAllExecutingJobsToComplete
136142
*
137-
* This method allows the caller to wait until all *currently* executing jobs have finished,
138-
* meaning once all running promises have either resolved or rejected.
139-
*
140-
* This is particularly useful in scenarios where you need to ensure that all jobs are completed
141-
* before proceeding, such as during shutdown processes or between unit tests.
142-
*
143-
* Note that the returned promise only awaits jobs that were executed at the time this method
144-
* was called. Specifically, it awaits all jobs initiated by this instance that had not completed
145-
* at the time of invocation.
146-
*
147-
* @returns A promise that resolves when all currently executing jobs are completed.
143+
* Waits for all **currently executing jobs** to finish, ensuring that all active promises
144+
* have either resolved or rejected before proceeding. This enables graceful termination in
145+
* scenarios such as:
146+
* - Application shutdowns (e.g., `onModuleDestroy` in Nest.js applications).
147+
* - Ensuring a clean state between unit tests.
148+
*
149+
* ### Considering Backpressure from Pending Jobs
150+
* By default, this method only waits for jobs that are already **executing** at the time of
151+
* invocation. In other words, the default behavior does **not** consider potential jobs that
152+
* are still queued (pending execution).
153+
* A backpressure of pending jobs may happen when multiple different callers share the same semaphore
154+
* instance, each being unaware of the others.
155+
* To extend the waiting behavior to include **potentially pending jobs** which account for backpressure,
156+
* use the optional `considerPendingJobsBackpressure` parameter set to `true`. When this flag is enabled,
157+
* the method will account for both existing and future backpressure, even if the backpressure arises
158+
* after the method is invoked.
159+
*
160+
* @param considerPendingJobsBackpressure A boolean indicating whether this method should also wait for
161+
* the resolution of all potentially queued jobs (i.e., those not
162+
* yet executed when the method was invoked).
163+
* This is especially relevant when multiple different callers
164+
* share the same semaphore instance, each being unaware of the
165+
* others.
166+
* @returns A promise that resolves once all currently executing jobs have completed.
167+
* If `considerPendingJobsBackpressure` is `true`, the promise will additionally
168+
* wait until all queued jobs have been executed, ensuring no pending job backpressure remains.
148169
*/
149-
waitForAllExecutingJobsToComplete(): Promise<void>;
170+
waitForAllExecutingJobsToComplete(considerPendingJobsBackpressure?: boolean): Promise<void>;
150171
/**
151172
* waitForAvailability
152173
*

dist/zero-backpressure-semaphore.js

Lines changed: 55 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)