You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
@@ -59,7 +59,7 @@ The `ZeroBackpressureSemaphore` class provides the following methods:
59
59
*__startExecution__: Resolves once the given job has **started** its execution. Users can leverage this to prevent backpressure of pending jobs; If the semaphore is too busy to start a given job `X`, there is no reason to create another job `Y` until `X` has started. This method is particularly useful for background job workers that frequently retrieve job metadata from external sources, such as pulling messages from a message broker.
60
60
*__waitForCompletion__: Executes the given job in a controlled manner, once there is an available slot. It resolves or rejects when the job **completes** execution, returning the job's value or propagating any error it may throw.
61
61
*__waitForAllExecutingJobsToComplete__: Resolves when all **currently** executing jobs have finished, meaning once all running promises have either resolved or rejected. This is particularly useful in scenarios where you need to ensure that all jobs are completed before proceeding, such as during shutdown processes or between unit tests.
62
-
*__waitForAvailability__: This method resolves once at least one slot is available for job execution. In other words, it resolves when the semaphore is available to trigger a new job immediately. Note that the same effect can be achieved with `startExecution` alone, if the async logic (intended to be delayed until availability) is handled within the job itself rather than as a preliminary step. Therefore, `waitForAvailability` serves as a design choice rather than a strict necessity.
62
+
*__waitForAvailability__: This method resolves once at least one slot is available for job execution. In other words, it resolves when the semaphore is available to trigger a new job immediately. Note that the same effect can be achieved with `startExecution` alone, if the async logic (intended to be delayed until availability) is handled **within the job itself** rather than as a preliminary step. Therefore, `waitForAvailability` serves as a design choice rather than a strict necessity.
63
63
*__extractUncaughtErrors__: Returns an array of uncaught errors, captured by the semaphore while executing background jobs added by `startExecution`. The instance will no longer hold these error references once extracted. In other words, ownership of these uncaught errors shifts to the caller, while the semaphore clears its list of uncaught errors.
64
64
65
65
If needed, refer to the code documentation for a more comprehensive description of each method.
@@ -89,7 +89,9 @@ This semaphore variant excels in eliminating backpressure when dispatching multi
89
89
Here, the start time of each job is crucial. Since a pending job cannot start its execution until the semaphore allows, there is no benefit to adding additional jobs that cannot start immediately. The `startExecution` method communicates the job's start time to the caller (resolves as soon as the job starts), which enables to create a new job as-soon-as it makes sense.
90
90
91
91
For example, consider an application managing 1M IoT sensors that require hourly data aggregation. To mitigate server load, a semaphore can be employed to limit the number of concurrent data aggregation tasks.
92
-
Rather than pre-creating 1M jobs (one for each sensor), which could potentially overwhelm the Node.js task queue and induce backpressure, the system should adopt a **just-in-time** approach. This means creating a sensor aggregation job only when the semaphore indicates availability, thereby optimizing resource utilization and maintaining system stability.
92
+
Instead of loading all sensor UIDs into memory and pre-creating 1M jobs (one for each sensor), which could potentially overwhelm the Node.js task queue and induce backpressure, the system should adopt a **just-in-time** approach. This means creating a sensor aggregation job only when the semaphore indicates availability, thereby optimizing resource utilization and maintaining system stability.
93
+
94
+
The following example demonstrates fetching sensor UIDs using an `AsyncGenerator`. Async generators and iterators are widely adopted in modern APIs, providing efficient handling of potentially large data sets. For instance, the [AWS-SDK](https://aws.amazon.com/blogs/developer/pagination-using-async-iterators-in-modular-aws-sdk-for-javascript/) utilizes them for pagination, abstracting away complexities like managing offsets. Similarly, [MongoDB's cursor](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/) enables iteration over a large number of documents in a paginated and asynchronous manner. These abstractions elegantly handle pagination internally, sparing users the complexities of managing offsets and other low-level details. By awaiting the semaphore's availability, the **space complexity** is implicitly constrained to *O(max(page-size, semaphore-capacity))*, as the `AsyncGenerator` fetches a new page only after all sensors from the current page have initiated aggregation.
93
95
94
96
Note: method `waitForAllExecutingJobsToComplete` can be used to perform post-processing, after all jobs have completed. It complements the typical use-cases of `startExecution`.
95
97
@@ -101,8 +103,12 @@ const sensorAggregationSemaphore = new ZeroBackpressureSemaphore<void>(
console.info(`Finished aggregating data from ${sensorUIDs.length} IoT sensors`);
123
+
console.info(`Finished aggregating data from ${fetchedSensorsCounter} IoT sensors`);
118
124
}
119
125
120
126
/**
@@ -128,7 +134,7 @@ async function handleDataAggregation(sensorUID): Promise<void> {
128
134
}
129
135
```
130
136
131
-
If the jobs might throw errors, you don't need to worry about these errors propagating up to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics).
137
+
If jobs might throw errors, you don't need to worry about these errors propagating to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics).
132
138
Refer to the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
`Successfully aggregated data from ${successfulJobsCount} IoT sensors, `+
168
178
`with failures in aggregating data from ${errors.length} IoT sensors`
@@ -182,105 +192,6 @@ async function handleDataAggregation(sensorUID): Promise<void> {
182
192
}
183
193
```
184
194
185
-
Please note that in a real-world scenario, sensor UIDs may be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
186
-
Working with message queues typically involves acknowledgements, which have **timeout** mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages. Backpressure on the semaphore means that messages experience longer delays before their corresponding jobs start execution.
187
-
Refer to the following adaptation of the previous example, where sensor UIDs are consumed from a message queue. This example overlooks error handling and message validation, for simplicity.
`Successfully aggregated data from ${successfulJobsCount} IoT sensors, `+
245
-
`with failures in aggregating data from ${errors.length} IoT sensors`
246
-
);
247
-
}
248
-
```
249
-
250
-
Alternatively, the `waitForAvailability` method can address this need by checking availability as a preliminary action, **before** consuming a message.
// Consider the queue as empty, for simplicity of this example.
261
-
break;
262
-
}
263
-
264
-
++numberOfProcessedMessages;
265
-
const { uid } =message.data;
266
-
267
-
// At this point, `startExecution` will begin immediately, due to the
268
-
// preliminary `waitForAvailability` action.
269
-
awaitsensorAggregationSemaphore.startExecution(
270
-
():Promise<void> =>handleDataAggregation(uid);
271
-
);
272
-
273
-
awaitmqClient.removeMessageFromQueue(message);
274
-
} while (true);
275
-
}
276
-
```
277
-
278
-
In reference to the above example, please note that `waitForAvailability` may be considered overkill or redundant if the job's duration is significantly shorter than the message timeout.
279
-
For example, if the message queue's timeout for acknowledging a message is 1 minute and a typical job duration is 1 second, the 59 second gap provides a substantial safety margin. In such cases, the preliminary `waitForAvailability` action can be omitted.
280
-
On the other hand, given that the timeout is 30 seconds and a typical job duration is 20 seconds, using `waitForAvailability` is sensible. This is because `startExecution` might have to wait 20 seconds before the job can begin, resulting in a total of 40 seconds from the invocation of `startExecution` until the job completes.
281
-
282
-
As a general rule, `waitForAvailability` is advisable whenever a timeout mechanism is involved, and the timeout period begins **before** the job starts execution. Note that the same effect can be achieved with `startExecution` alone, if the timeout-triggering logic is included in the job itself (such as, consuming a message). Both approaches are valid.
283
-
284
195
## 2nd use-case: Single Job Execution :man_technologist:<aid="second-use-case"></a>
285
196
286
197
The `waitForCompletion` method is useful for executing a sub-procedure, for which the caller must wait before proceeding with its work.
Copy file name to clipboardExpand all lines: package.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line number
Diff line number
Diff line change
@@ -1,6 +1,6 @@
1
1
{
2
2
"name": "zero-backpressure-semaphore-typescript",
3
-
"version": "3.0.8",
3
+
"version": "3.0.9",
4
4
"description": "A modern Promise-semaphore for Node.js projects, enabling users to limit the number of concurrently executing promises. Offering backpressure control for enhanced efficiency, utilizing a communicative API that signals availability, promoting a just-in-time approach. Additionally, it incorporates mechanisms for graceful termination and error handling, making it suitable for complex scenarios.",
0 commit comments