Skip to content

Commit 290ebe6

Browse files
authored
Merge pull request #319 from thomhurst/copilot/fix-318
Add comprehensive disposal documentation and examples for processor objects
2 parents 6b88a39 + 846679d commit 290ebe6

File tree

7 files changed

+491
-27
lines changed

7 files changed

+491
-27
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#if NET6_0_OR_GREATER
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Runtime.CompilerServices;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using EnumerableAsyncProcessor.Extensions;
9+
10+
namespace EnumerableAsyncProcessor.Example;
11+
12+
/// <summary>
13+
/// Examples demonstrating proper disposal patterns for EnumerableAsyncProcessor objects.
14+
/// This addresses the common question: "How/when to correctly dispose the resulting processor objects?"
15+
///
16+
/// QUICK ANSWER: Always dispose processor objects using 'await using' or manual disposal!
17+
///
18+
/// The key pattern is:
19+
/// ❌ BAD: var processor = items.SelectAsync(...).ProcessInParallel(); // Never disposed!
20+
/// ✅ GOOD: await using var processor = items.SelectAsync(...).ProcessInParallel(); // Auto-disposed!
21+
/// </summary>
22+
public static class DisposalExample
23+
{
24+
public static async Task RunExamples()
25+
{
26+
Console.WriteLine("Disposal Pattern Examples");
27+
Console.WriteLine("========================\n");
28+
29+
// Example 1: The problematic pattern from the issue
30+
Console.WriteLine("Example 1: PROBLEMATIC - No disposal (resource leak!)");
31+
var results1 = await ProblematicPatternAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
32+
Console.WriteLine($"Results: {string.Join(", ", results1.ToList())}");
33+
Console.WriteLine("⚠️ This pattern leaks resources because the processor is never disposed!\n");
34+
35+
// Example 2: Proper disposal with await using
36+
Console.WriteLine("Example 2: PROPER - Using await using for automatic disposal");
37+
var results2 = await ProperPatternWithAwaitUsingAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
38+
Console.WriteLine($"Results: {string.Join(", ", results2.ToList())}");
39+
Console.WriteLine("✅ Resources automatically cleaned up with await using\n");
40+
41+
// Example 3: Proper disposal with manual try-finally
42+
Console.WriteLine("Example 3: PROPER - Manual disposal with try-finally");
43+
var results3 = await ProperPatternWithManualDisposalAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
44+
Console.WriteLine($"Results: {string.Join(", ", results3.ToList())}");
45+
Console.WriteLine("✅ Resources manually cleaned up in finally block\n");
46+
47+
// Example 4: Using the convenience extension (no disposal needed)
48+
Console.WriteLine("Example 4: CONVENIENT - Using extension methods (disposal handled internally)");
49+
var asyncEnumerable = GenerateAsyncEnumerable(5);
50+
var results4 = await asyncEnumerable.ProcessInParallel(async item =>
51+
{
52+
await Task.Delay(50);
53+
return item * 2;
54+
});
55+
Console.WriteLine($"Results: {string.Join(", ", results4)}");
56+
Console.WriteLine("✅ Extension methods handle disposal internally\n");
57+
58+
// Example 5: Streaming results with proper disposal
59+
Console.WriteLine("Example 5: STREAMING - Processing results as they arrive with proper disposal");
60+
await StreamingWithProperDisposalAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
61+
Console.WriteLine("✅ Streamed results with proper disposal\n");
62+
}
63+
64+
/// <summary>
65+
/// This is the PROBLEMATIC pattern from the GitHub issue - it leaks resources!
66+
/// DO NOT USE THIS PATTERN in production code.
67+
/// </summary>
68+
private static async Task<IAsyncEnumerable<int>> ProblematicPatternAsync(int[] input, CancellationToken token)
69+
{
70+
// ⚠️ PROBLEM: The processor is created but never disposed!
71+
var batchProcessor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
72+
73+
// This returns the async enumerable, but the processor that created it is never disposed
74+
return batchProcessor.GetResultsAsyncEnumerable();
75+
76+
// 🔥 RESOURCE LEAK: The processor goes out of scope without being disposed,
77+
// potentially leaving tasks running and resources uncleaned
78+
}
79+
80+
/// <summary>
81+
/// PROPER pattern using await using for automatic disposal.
82+
/// This is the recommended approach.
83+
/// </summary>
84+
private static async Task<IAsyncEnumerable<int>> ProperPatternWithAwaitUsingAsync(int[] input, CancellationToken token)
85+
{
86+
// ✅ Create processor with await using for automatic disposal
87+
await using var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
88+
89+
// Collect results into a list to return
90+
var results = new List<int>();
91+
await foreach (var result in processor.GetResultsAsyncEnumerable())
92+
{
93+
results.Add(result);
94+
}
95+
96+
// Return as async enumerable
97+
return results.ToAsyncEnumerable();
98+
99+
// ✅ Processor is automatically disposed here due to 'await using'
100+
}
101+
102+
/// <summary>
103+
/// PROPER pattern using manual disposal with try-finally.
104+
/// Use this when you need more control over the disposal timing.
105+
/// </summary>
106+
private static async Task<IAsyncEnumerable<int>> ProperPatternWithManualDisposalAsync(int[] input, CancellationToken token)
107+
{
108+
var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
109+
110+
try
111+
{
112+
// Collect results into a list to return
113+
var results = new List<int>();
114+
await foreach (var result in processor.GetResultsAsyncEnumerable())
115+
{
116+
results.Add(result);
117+
}
118+
119+
return results.ToAsyncEnumerable();
120+
}
121+
finally
122+
{
123+
// ✅ Manually dispose the processor to clean up resources
124+
await processor.DisposeAsync();
125+
}
126+
}
127+
128+
/// <summary>
129+
/// Example of streaming results while maintaining proper disposal.
130+
/// This shows how to process results as they arrive.
131+
/// </summary>
132+
private static async Task StreamingWithProperDisposalAsync(int[] input, CancellationToken token)
133+
{
134+
await using var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
135+
136+
var processedCount = 0;
137+
await foreach (var result in processor.GetResultsAsyncEnumerable())
138+
{
139+
processedCount++;
140+
Console.WriteLine($" Received result {processedCount}: {result}");
141+
}
142+
143+
// Processor automatically disposed here
144+
}
145+
146+
/// <summary>
147+
/// Simulates an async transformation operation
148+
/// </summary>
149+
private static async Task<int> TransformAsync(int value)
150+
{
151+
// Simulate some async work
152+
await Task.Delay(50);
153+
return value * 10;
154+
}
155+
156+
/// <summary>
157+
/// Generates an async enumerable for testing
158+
/// </summary>
159+
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(
160+
int count,
161+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
162+
{
163+
for (int i = 1; i <= count; i++)
164+
{
165+
await Task.Yield();
166+
cancellationToken.ThrowIfCancellationRequested();
167+
yield return i;
168+
}
169+
}
170+
}
171+
#endif

EnumerableAsyncProcessor.Example/ProcessInParallelExample.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,30 @@ public static async Task RunExample()
7373
Console.WriteLine($"Sequential processing time: {sequentialTime.TotalMilliseconds:F0}ms");
7474
Console.WriteLine($"Parallel processing time: {parallelTime.TotalMilliseconds:F0}ms");
7575
Console.WriteLine($"Speedup: {sequentialTime.TotalMilliseconds / parallelTime.TotalMilliseconds:F1}x");
76+
77+
// Example 5: Proper disposal with builder pattern
78+
Console.WriteLine("\nExample 5: Proper disposal when using builder pattern");
79+
var data = Enumerable.Range(1, 10).ToArray();
80+
81+
// Using await using for automatic disposal
82+
await using var processor = data
83+
.SelectAsync(async item =>
84+
{
85+
await Task.Delay(50);
86+
return item * 3;
87+
}, CancellationToken.None)
88+
.ProcessInParallel(maxConcurrency: 3);
89+
90+
// Process results as they become available
91+
var processedCount = 0;
92+
await foreach (var result in processor.GetResultsAsyncEnumerable())
93+
{
94+
processedCount++;
95+
Console.WriteLine($"Processed item {processedCount}: {result}");
96+
}
97+
98+
Console.WriteLine($"All {processedCount} items processed with proper disposal");
99+
// Processor is automatically disposed here due to 'await using'
76100
}
77101

78102
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(

EnumerableAsyncProcessor.Example/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,8 @@ Task<HttpResponseMessage> PingAsync()
7474
// Run ProcessInParallel examples
7575
Console.WriteLine("\n\n=== Running ProcessInParallel Extension Examples ===\n");
7676
await ProcessInParallelExample.RunExample();
77+
78+
// Run disposal pattern examples
79+
Console.WriteLine("\n\n=== Running Disposal Pattern Examples ===\n");
80+
await DisposalExample.RunExamples();
7781
#endif

EnumerableAsyncProcessor/Builders/ItemActionAsyncProcessorBuilder_2.cs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,47 @@ internal ItemActionAsyncProcessorBuilder(IEnumerable<TInput> items, Func<TInput,
1717
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1818
}
1919

20+
/// <summary>
21+
/// Processes items in batches of the specified size.
22+
/// </summary>
23+
/// <param name="batchSize">The number of items to process in each batch.</param>
24+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
25+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
26+
/// <remarks>
27+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
28+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
29+
/// </remarks>
2030
public IAsyncProcessor<TOutput> ProcessInBatches(int batchSize)
2131
{
2232
return new ResultBatchAsyncProcessor<TInput, TOutput>(batchSize, _items, _taskSelector, _cancellationTokenSource).StartProcessing();
2333
}
2434

35+
/// <summary>
36+
/// Processes items in parallel with the specified level of parallelism.
37+
/// </summary>
38+
/// <param name="levelOfParallelism">The maximum number of concurrent operations.</param>
39+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
40+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
41+
/// <remarks>
42+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
43+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
44+
/// </remarks>
2545
public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism)
2646
{
2747
return new ResultRateLimitedParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, levelOfParallelism, _cancellationTokenSource).StartProcessing();
2848
}
2949

50+
/// <summary>
51+
/// Processes items in parallel with the specified level of parallelism and time constraints.
52+
/// </summary>
53+
/// <param name="levelOfParallelism">The maximum number of concurrent operations.</param>
54+
/// <param name="timeSpan">The time span constraint for rate limiting.</param>
55+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
56+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
57+
/// <remarks>
58+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
59+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
60+
/// </remarks>
3061
public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism, TimeSpan timeSpan)
3162
{
3263
return new ResultTimedRateLimitedParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, levelOfParallelism, timeSpan, _cancellationTokenSource).StartProcessing();
@@ -35,7 +66,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism, TimeSp
3566
/// <summary>
3667
/// Process items in parallel without concurrency limits and return results.
3768
/// </summary>
38-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
69+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
70+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
71+
/// <remarks>
72+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
73+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
74+
/// </remarks>
3975
public IAsyncProcessor<TOutput> ProcessInParallel()
4076
{
4177
return ProcessInParallel(null, false);
@@ -45,7 +81,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel()
4581
/// Process items in parallel without concurrency limits and return results.
4682
/// </summary>
4783
/// <param name="scheduleOnThreadPool">If true, schedules tasks on thread pool to prevent blocking. Default is false for maximum performance.</param>
48-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
84+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
85+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
86+
/// <remarks>
87+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
88+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
89+
/// </remarks>
4990
public IAsyncProcessor<TOutput> ProcessInParallel(bool scheduleOnThreadPool)
5091
{
5192
return ProcessInParallel(null, scheduleOnThreadPool);
@@ -55,7 +96,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel(bool scheduleOnThreadPool)
5596
/// Process items in parallel with specified concurrency limit and return results.
5697
/// </summary>
5798
/// <param name="maxConcurrency">Maximum concurrent operations.</param>
58-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
99+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
100+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
101+
/// <remarks>
102+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
103+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
104+
/// </remarks>
59105
public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency)
60106
{
61107
return ProcessInParallel(maxConcurrency, false);
@@ -66,12 +112,26 @@ public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency)
66112
/// </summary>
67113
/// <param name="maxConcurrency">Maximum concurrent operations.</param>
68114
/// <param name="scheduleOnThreadPool">If true, schedules tasks on thread pool to prevent blocking.</param>
69-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
115+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
116+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
117+
/// <remarks>
118+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
119+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
120+
/// </remarks>
70121
public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency, bool scheduleOnThreadPool)
71122
{
72123
return new ResultParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, _cancellationTokenSource, maxConcurrency, scheduleOnThreadPool).StartProcessing();
73124
}
74125

126+
/// <summary>
127+
/// Process items one at a time sequentially.
128+
/// </summary>
129+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
130+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
131+
/// <remarks>
132+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
133+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
134+
/// </remarks>
75135
public IAsyncProcessor<TOutput> ProcessOneAtATime()
76136
{
77137
return new ResultOneAtATimeAsyncProcessor<TInput, TOutput>(_items, _taskSelector, _cancellationTokenSource).StartProcessing();

EnumerableAsyncProcessor/Extensions/EnumerableExtensions.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,37 @@ public static ItemAsyncProcessorBuilder<T> ToAsyncProcessorBuilder<T>(this IEnum
99
return new ItemAsyncProcessorBuilder<T>(items);
1010
}
1111

12+
/// <summary>
13+
/// Creates an async processor builder that can transform items and return results.
14+
/// </summary>
15+
/// <typeparam name="T">The input item type.</typeparam>
16+
/// <typeparam name="TOutput">The output result type.</typeparam>
17+
/// <param name="items">The items to process.</param>
18+
/// <param name="taskSelector">The async transformation function.</param>
19+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
20+
/// <returns>A builder that can be configured with processing options like ProcessInParallel().</returns>
21+
/// <remarks>
22+
/// The processors created by this builder implement IDisposable/IAsyncDisposable and should be properly disposed.
23+
/// Use 'await using var processor = items.SelectAsync(...).ProcessInParallel();' for automatic disposal.
24+
/// </remarks>
1225
public static ItemActionAsyncProcessorBuilder<T, TOutput> SelectAsync<T, TOutput>(this IEnumerable<T> items, Func<T, Task<TOutput>> taskSelector, CancellationToken cancellationToken = default)
1326
{
1427
return items.ToAsyncProcessorBuilder()
1528
.SelectAsync(taskSelector, cancellationToken);
1629
}
1730

31+
/// <summary>
32+
/// Creates an async processor builder for operations that don't return results.
33+
/// </summary>
34+
/// <typeparam name="T">The input item type.</typeparam>
35+
/// <param name="items">The items to process.</param>
36+
/// <param name="taskSelector">The async operation to perform on each item.</param>
37+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
38+
/// <returns>A builder that can be configured with processing options like ProcessInParallel().</returns>
39+
/// <remarks>
40+
/// The processors created by this builder implement IDisposable/IAsyncDisposable and should be properly disposed.
41+
/// Use 'await using var processor = items.ForEachAsync(...).ProcessInParallel();' for automatic disposal.
42+
/// </remarks>
1843
public static ItemActionAsyncProcessorBuilder<T> ForEachAsync<T>(this IEnumerable<T> items, Func<T, Task> taskSelector, CancellationToken cancellationToken = default)
1944
{
2045
return items.ToAsyncProcessorBuilder()

0 commit comments

Comments
 (0)