Skip to content

Faster Susbscriber #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
**Cloudtoid Interprocess** is a cross-platform shared memory queue for fast communication between processes ([Interprocess Communication or IPC][IPCWiki]). It uses a shared memory-mapped file for extremely fast and efficient communication between processes and it is used internally by Microsoft.

- [**Fast**](#performance): It is *extremely* fast.
- **Cross-platform**: It supports Windows, and Unix-based operating systems such as Linux, [MacOS][MacOSWiki], and [FreeBSD][FreeBSDOrg].
- [**API**](#Usage): Provides a simple and intuitive API to enqueue/send and dequeue/receive messages.
- **Cross-platform**: It supports Windows, and Unix-based operating systems such as Linux, [macOS][macOSWiki], and [FreeBSD][FreeBSDOrg].
- [**API**](#usage): Provides a simple and intuitive API to enqueue/send and dequeue/receive messages.
- **Multiple publishers and subscribers**: It supports multiple publishers and subscribers to a shared queue.
- [**Efficient**](#performance): Sending and receiving messages is almost heap memory allocation free reducing garbage collections.
- [**Developer**](#Author): Developed by a guy at Microsoft.
- [**Developer**](#author): Developed by a guy at Microsoft.

## NuGet Package

Expand All @@ -40,7 +40,7 @@ Creating a message queue publisher:
```csharp
var options = new QueueOptions(
queueName: "my-queue",
bytesCapacity: 1024 * 1024);
capacity: 1024 * 1024);

using var publisher = factory.CreatePublisher(options);
publisher.TryEnqueue(message);
Expand All @@ -51,7 +51,7 @@ Creating a message queue subscriber:
```csharp
options = new QueueOptions(
queueName: "my-queue",
bytesCapacity: 1024 * 1024);
capacity: 1024 * 1024);

using var subscriber = factory.CreateSubscriber(options);
subscriber.TryDequeue(messageBuffer, cancellationToken, out var message);
Expand All @@ -72,7 +72,7 @@ Creating a message queue publisher using an instance of `IQueueFactory` retrieve
```csharp
var options = new QueueOptions(
queueName: "my-queue",
bytesCapacity: 1024 * 1024);
capacity: 1024 * 1024);

using var publisher = factory.CreatePublisher(options);
publisher.TryEnqueue(message);
Expand All @@ -83,7 +83,7 @@ Creating a message queue subscriber using an instance of `IQueueFactory` retriev
```csharp
var options = new QueueOptions(
queueName: "my-queue",
bytesCapacity: 1024 * 1024);
capacity: 1024 * 1024);

using var subscriber = factory.CreateSubscriber(options);
subscriber.TryDequeue(messageBuffer, cancellationToken, out var message);
Expand All @@ -102,7 +102,7 @@ Please note that you can start multiple publishers and subscribers sending and r

A lot has gone into optimizing the implementation of this library. For instance, it is mostly heap-memory allocation free, reducing the need for garbage collection induced pauses.

**Summary**: A full enqueue followed by a dequeue takes `~250 ns` on Linux, `~650 ns` on MacOS, and `~300 ns` on Windows.
**Summary**: A full enqueue followed by a dequeue takes `~250 ns` on Linux, `~650 ns` on macOS, and `~300 ns` on Windows.

**Details**: To benchmark the performance and memory usage, we use [BenchmarkDotNet][BenchmarkOrg] and perform the following runs:

Expand Down Expand Up @@ -148,7 +148,7 @@ Results:

---

### On MacOS
### On macOS

Host:

Expand Down Expand Up @@ -194,14 +194,14 @@ Results:

This library relies on [Named Semaphores][NamedSemaphoresDoc] To signal the existence of a new message to all message subscribers and to do it across process boundaries. Named semaphores are synchronization constructs accessible across processes.

.NET Core 3.1 and .NET 6/7 do not support named semaphores on Unix-based OSs (Linux, macOS, etc.). Instead we are using P/Invoke and relying on operating system's POSIX semaphore implementation. ([Linux](src/Interprocess/Semaphore/Linux/Interop.cs) and [MacOS](src/Interprocess/Semaphore/MacOS/Interop.cs) implementations).
.NET Core 3.1 and .NET 6/7 do not support named semaphores on Unix-based OSs (Linux, macOS, etc.). Instead we are using P/Invoke and relying on operating system's POSIX semaphore implementation. ([Linux](src/Interprocess/Semaphore/Linux/Interop.cs) and [macOS](src/Interprocess/Semaphore/MacOS/Interop.cs) implementations).

This implementation will be replaced with [`System.Threading.Semaphore`][SemaphoreDoc] once .NET adds support for named semaphores on all platforms.

## How to Contribute

- Create a branch from `main`.
- Ensure that all tests pass on Windows, Linux, and MacOS.
- Ensure that all tests pass on Windows, Linux, and macOS.
- Keep the code coverage number above 80% by adding new tests or modifying the existing tests.
- Send a pull request.

Expand All @@ -226,11 +226,11 @@ Here are a couple of items that we are working on.
[DotNetPlatformBadge]:https://img.shields.io/badge/.net-%3E%206.0-blue
[NuGet]:https://www.nuget.org/packages/Cloudtoid.Interprocess/
[IPCWiki]:https://en.wikipedia.org/wiki/Inter-process_communication
[MacOSWiki]:https://en.wikipedia.org/wiki/MacOS
[macOSWiki]:https://en.wikipedia.org/wiki/macOS
[FreeBSDOrg]:https://www.freebsd.org/
[Wow64Wiki]:https://en.wikipedia.org/wiki/WoW64
[WslDoc]:https://docs.microsoft.com/en-us/windows/wsl/about
[WslDoc]:https://learn.microsoft.com/windows/wsl/about
[BenchmarkOrg]:https://benchmarkdotnet.org/
[NamedSemaphoresDoc]:https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore#remarks
[SemaphoreDoc]:https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore
[NamedSemaphoresDoc]:https://docs.microsoft.com/dotnet/api/system.threading.semaphore#remarks
[SemaphoreDoc]:https://docs.microsoft.com/dotnet/api/system.threading.semaphore
[PedramLinkedIn]:https://www.linkedin.com/in/pedramrezaei/
46 changes: 24 additions & 22 deletions src/Interprocess/Queue/Subscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Cloudtoid.Interprocess
{
internal sealed class Subscriber : Queue, ISubscriber
{
private static readonly long TicksForTenSeconds = TimeSpan.FromSeconds(10).Ticks;
private static readonly long TenSeconds = TimeSpan.FromSeconds(10).Ticks;
private readonly CancellationTokenSource cancellationSource = new();
private readonly CountdownEvent countdownEvent = new(1);
private readonly IInterprocessSemaphoreWaiter signal;
Expand Down Expand Up @@ -74,18 +74,13 @@ private ReadOnlyMemory<byte> DequeueCore(Memory<byte>? resultBuffer, Cancellatio

try
{
int i = -5;
while (true)
{
if (TryDequeueImpl(resultBuffer, cancellation, out var message))
return message;

if (i > 10)
signal.Wait(millisecondsTimeout: 10);
else if (i++ > 0)
signal.Wait(millisecondsTimeout: i);
else
Thread.Yield();
signal.Wait(millisecondsTimeout: 5);
cancellationSource.ThrowIfCancellationRequested(cancellation);
}
}
finally
Expand All @@ -99,25 +94,31 @@ private unsafe bool TryDequeueImpl(
CancellationToken cancellation,
out ReadOnlyMemory<byte> message)
{
cancellationSource.ThrowIfCancellationRequested(cancellation);

message = ReadOnlyMemory<byte>.Empty;
var header = *Header;

// is this an empty queue?
if (header.IsEmpty())
return false;
while (true)
{
var header = *Header;

var readLockTimestamp = header.ReadLockTimestamp;
var now = DateTime.UtcNow.Ticks;
// is this an empty queue?
if (header.IsEmpty())
return false;

// is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed?
if (now - readLockTimestamp < TicksForTenSeconds)
return false;
var readLockTimestamp = header.ReadLockTimestamp;
var now = DateTime.UtcNow.Ticks;

// take a read-lock so no other thread can read a message
if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) != readLockTimestamp)
return false;
// is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed?
if (now - readLockTimestamp < TenSeconds)
{
Thread.Yield();
cancellationSource.ThrowIfCancellationRequested(cancellation);
continue;
}

// take a read-lock so no other thread can read a message
if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) == readLockTimestamp)
break;
}

try
{
Expand All @@ -136,6 +137,7 @@ private unsafe bool TryDequeueImpl(
MessageHeader.ReadyToBeConsumedState) != MessageHeader.ReadyToBeConsumedState)
{
Thread.Yield();
cancellationSource.ThrowIfCancellationRequested(cancellation);
}

// read the message body from the queue
Expand Down