From d076d09317d290f3eabf59864d1b96f58b44b4b4 Mon Sep 17 00:00:00 2001 From: prezaei Date: Sat, 26 Nov 2022 21:36:56 -0800 Subject: [PATCH 1/4] Enhancing the subscriber --- src/Interprocess/Queue/Subscriber.cs | 30 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/Interprocess/Queue/Subscriber.cs b/src/Interprocess/Queue/Subscriber.cs index c018df2..e841a83 100644 --- a/src/Interprocess/Queue/Subscriber.cs +++ b/src/Interprocess/Queue/Subscriber.cs @@ -102,22 +102,28 @@ private unsafe bool TryDequeueImpl( cancellationSource.ThrowIfCancellationRequested(cancellation); message = ReadOnlyMemory.Empty; - var header = *Header; - // is this an empty queue? - if (header.IsEmpty()) - return false; + while (true) + { + var header = *Header; + + // is this an empty queue? + if (header.IsEmpty()) + return false; - var readLockTimestamp = header.ReadLockTimestamp; - var now = DateTime.UtcNow.Ticks; + var readLockTimestamp = header.ReadLockTimestamp; + var now = DateTime.UtcNow.Ticks; - // is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed? - if (now - readLockTimestamp < TicksForTenSeconds) - return false; + // is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed? + if (now - readLockTimestamp < TicksForTenSeconds) + return false; + + // take a read-lock so no other thread can read a message + if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) == readLockTimestamp) + break; - // take a read-lock so no other thread can read a message - if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) != readLockTimestamp) - return false; + Thread.Yield(); + } try { From 03dbbdd784d4e4837f3a376e8bf93818509e65b4 Mon Sep 17 00:00:00 2001 From: prezaei Date: Mon, 28 Nov 2022 12:41:48 -0800 Subject: [PATCH 2/4] faster subscriber --- src/Interprocess/Queue/Subscriber.cs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/Interprocess/Queue/Subscriber.cs b/src/Interprocess/Queue/Subscriber.cs index e841a83..bd4c337 100644 --- a/src/Interprocess/Queue/Subscriber.cs +++ b/src/Interprocess/Queue/Subscriber.cs @@ -6,7 +6,7 @@ namespace Cloudtoid.Interprocess { internal sealed class Subscriber : Queue, ISubscriber { - private static readonly long TicksForTenSeconds = TimeSpan.FromSeconds(10).Ticks; + private static readonly long TenSeconds = TimeSpan.FromSeconds(10).Ticks; private readonly CancellationTokenSource cancellationSource = new(); private readonly CountdownEvent countdownEvent = new(1); private readonly IInterprocessSemaphoreWaiter signal; @@ -74,18 +74,13 @@ private ReadOnlyMemory DequeueCore(Memory? resultBuffer, Cancellatio try { - int i = -5; while (true) { if (TryDequeueImpl(resultBuffer, cancellation, out var message)) return message; - if (i > 10) - signal.Wait(millisecondsTimeout: 10); - else if (i++ > 0) - signal.Wait(millisecondsTimeout: i); - else - Thread.Yield(); + signal.Wait(millisecondsTimeout: 5); + cancellationSource.ThrowIfCancellationRequested(cancellation); } } finally @@ -99,8 +94,6 @@ private unsafe bool TryDequeueImpl( CancellationToken cancellation, out ReadOnlyMemory message) { - cancellationSource.ThrowIfCancellationRequested(cancellation); - message = ReadOnlyMemory.Empty; while (true) @@ -115,14 +108,16 @@ private unsafe bool TryDequeueImpl( var now = DateTime.UtcNow.Ticks; // is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed? - if (now - readLockTimestamp < TicksForTenSeconds) - return false; + if (now - readLockTimestamp < TenSeconds) + { + Thread.Yield(); + cancellationSource.ThrowIfCancellationRequested(cancellation); + continue; + } // take a read-lock so no other thread can read a message if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) == readLockTimestamp) break; - - Thread.Yield(); } try @@ -142,6 +137,7 @@ private unsafe bool TryDequeueImpl( MessageHeader.ReadyToBeConsumedState) != MessageHeader.ReadyToBeConsumedState) { Thread.Yield(); + cancellationSource.ThrowIfCancellationRequested(cancellation); } // read the message body from the queue From 17b659cede756fa339a69fe7e324488a711df136 Mon Sep 17 00:00:00 2001 From: prezaei Date: Fri, 9 Dec 2022 15:21:05 -0800 Subject: [PATCH 3/4] Updates to readme.md --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 14e9be1..2950cdf 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,10 @@ - [**Fast**](#performance): It is *extremely* fast. - **Cross-platform**: It supports Windows, and Unix-based operating systems such as Linux, [MacOS][MacOSWiki], and [FreeBSD][FreeBSDOrg]. -- [**API**](#Usage): Provides a simple and intuitive API to enqueue/send and dequeue/receive messages. +- [**API**](#usage): Provides a simple and intuitive API to enqueue/send and dequeue/receive messages. - **Multiple publishers and subscribers**: It supports multiple publishers and subscribers to a shared queue. - [**Efficient**](#performance): Sending and receiving messages is almost heap memory allocation free reducing garbage collections. -- [**Developer**](#Author): Developed by a guy at Microsoft. +- [**Developer**](#author): Developed by a guy at Microsoft. ## NuGet Package @@ -40,7 +40,7 @@ Creating a message queue publisher: ```csharp var options = new QueueOptions( queueName: "my-queue", - bytesCapacity: 1024 * 1024); + capacity: 1024 * 1024); using var publisher = factory.CreatePublisher(options); publisher.TryEnqueue(message); @@ -51,7 +51,7 @@ Creating a message queue subscriber: ```csharp options = new QueueOptions( queueName: "my-queue", - bytesCapacity: 1024 * 1024); + capacity: 1024 * 1024); using var subscriber = factory.CreateSubscriber(options); subscriber.TryDequeue(messageBuffer, cancellationToken, out var message); @@ -72,7 +72,7 @@ Creating a message queue publisher using an instance of `IQueueFactory` retrieve ```csharp var options = new QueueOptions( queueName: "my-queue", - bytesCapacity: 1024 * 1024); + capacity: 1024 * 1024); using var publisher = factory.CreatePublisher(options); publisher.TryEnqueue(message); @@ -83,7 +83,7 @@ Creating a message queue subscriber using an instance of `IQueueFactory` retriev ```csharp var options = new QueueOptions( queueName: "my-queue", - bytesCapacity: 1024 * 1024); + capacity: 1024 * 1024); using var subscriber = factory.CreateSubscriber(options); subscriber.TryDequeue(messageBuffer, cancellationToken, out var message); @@ -229,8 +229,8 @@ Here are a couple of items that we are working on. [MacOSWiki]:https://en.wikipedia.org/wiki/MacOS [FreeBSDOrg]:https://www.freebsd.org/ [Wow64Wiki]:https://en.wikipedia.org/wiki/WoW64 -[WslDoc]:https://docs.microsoft.com/en-us/windows/wsl/about +[WslDoc]:https://learn.microsoft.com/windows/wsl/about [BenchmarkOrg]:https://benchmarkdotnet.org/ -[NamedSemaphoresDoc]:https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore#remarks -[SemaphoreDoc]:https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore +[NamedSemaphoresDoc]:https://docs.microsoft.com/dotnet/api/system.threading.semaphore#remarks +[SemaphoreDoc]:https://docs.microsoft.com/dotnet/api/system.threading.semaphore [PedramLinkedIn]:https://www.linkedin.com/in/pedramrezaei/ From e6710ba8de7bb579315ce11fedf3bb323c223344 Mon Sep 17 00:00:00 2001 From: prezaei Date: Fri, 9 Dec 2022 15:25:39 -0800 Subject: [PATCH 4/4] One more readme! --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2950cdf..29fe2bf 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ **Cloudtoid Interprocess** is a cross-platform shared memory queue for fast communication between processes ([Interprocess Communication or IPC][IPCWiki]). It uses a shared memory-mapped file for extremely fast and efficient communication between processes and it is used internally by Microsoft. - [**Fast**](#performance): It is *extremely* fast. -- **Cross-platform**: It supports Windows, and Unix-based operating systems such as Linux, [MacOS][MacOSWiki], and [FreeBSD][FreeBSDOrg]. +- **Cross-platform**: It supports Windows, and Unix-based operating systems such as Linux, [macOS][macOSWiki], and [FreeBSD][FreeBSDOrg]. - [**API**](#usage): Provides a simple and intuitive API to enqueue/send and dequeue/receive messages. - **Multiple publishers and subscribers**: It supports multiple publishers and subscribers to a shared queue. - [**Efficient**](#performance): Sending and receiving messages is almost heap memory allocation free reducing garbage collections. @@ -102,7 +102,7 @@ Please note that you can start multiple publishers and subscribers sending and r A lot has gone into optimizing the implementation of this library. For instance, it is mostly heap-memory allocation free, reducing the need for garbage collection induced pauses. -**Summary**: A full enqueue followed by a dequeue takes `~250 ns` on Linux, `~650 ns` on MacOS, and `~300 ns` on Windows. +**Summary**: A full enqueue followed by a dequeue takes `~250 ns` on Linux, `~650 ns` on macOS, and `~300 ns` on Windows. **Details**: To benchmark the performance and memory usage, we use [BenchmarkDotNet][BenchmarkOrg] and perform the following runs: @@ -148,7 +148,7 @@ Results: --- -### On MacOS +### On macOS Host: @@ -194,14 +194,14 @@ Results: This library relies on [Named Semaphores][NamedSemaphoresDoc] To signal the existence of a new message to all message subscribers and to do it across process boundaries. Named semaphores are synchronization constructs accessible across processes. -.NET Core 3.1 and .NET 6/7 do not support named semaphores on Unix-based OSs (Linux, macOS, etc.). Instead we are using P/Invoke and relying on operating system's POSIX semaphore implementation. ([Linux](src/Interprocess/Semaphore/Linux/Interop.cs) and [MacOS](src/Interprocess/Semaphore/MacOS/Interop.cs) implementations). +.NET Core 3.1 and .NET 6/7 do not support named semaphores on Unix-based OSs (Linux, macOS, etc.). Instead we are using P/Invoke and relying on operating system's POSIX semaphore implementation. ([Linux](src/Interprocess/Semaphore/Linux/Interop.cs) and [macOS](src/Interprocess/Semaphore/MacOS/Interop.cs) implementations). This implementation will be replaced with [`System.Threading.Semaphore`][SemaphoreDoc] once .NET adds support for named semaphores on all platforms. ## How to Contribute - Create a branch from `main`. -- Ensure that all tests pass on Windows, Linux, and MacOS. +- Ensure that all tests pass on Windows, Linux, and macOS. - Keep the code coverage number above 80% by adding new tests or modifying the existing tests. - Send a pull request. @@ -226,7 +226,7 @@ Here are a couple of items that we are working on. [DotNetPlatformBadge]:https://img.shields.io/badge/.net-%3E%206.0-blue [NuGet]:https://www.nuget.org/packages/Cloudtoid.Interprocess/ [IPCWiki]:https://en.wikipedia.org/wiki/Inter-process_communication -[MacOSWiki]:https://en.wikipedia.org/wiki/MacOS +[macOSWiki]:https://en.wikipedia.org/wiki/macOS [FreeBSDOrg]:https://www.freebsd.org/ [Wow64Wiki]:https://en.wikipedia.org/wiki/WoW64 [WslDoc]:https://learn.microsoft.com/windows/wsl/about