Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint)
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied);
}

public Task<IPendingRequestQueue> CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken)
{
return Task.FromResult(CreateQueue(endpoint));
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource cancellationTokenSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint)
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources);
}

public Task<IPendingRequestQueue> CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken)
{
return Task.FromResult(CreateQueue(endpoint));
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource[] cancellationTokenSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void SetUp()
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>());
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoSubscribersObserver.Instance, Substitute.For<ILog>());
}

// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation
Expand Down
4 changes: 2 additions & 2 deletions source/Halibut.Tests/Transport/SecureClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
var connection = Substitute.For<IConnection>();
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log));
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoSubscribersObserver.Instance, log));

await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
}
Expand Down Expand Up @@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
{
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoSubscribersObserver.Instance, logger);
}
}
}
3 changes: 2 additions & 1 deletion source/Halibut.Tests/Transport/SecureListenerFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow
(_, _) => UnauthorizedClientConnectResponse.BlockConnection,
timeoutsAndLimits,
new StreamFactory(),
NoOpConnectionsObserver.Instance
NoOpConnectionsObserver.Instance,
NoSubscribersObserver.Instance
);

var idleAverage = CollectCounterValues(opsPerSec)
Expand Down
7 changes: 7 additions & 0 deletions source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut.ServiceModel;

namespace Halibut.Tests.Util
Expand All @@ -16,5 +18,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return createQueue(endpoint);
}

public Task<IPendingRequestQueue> CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken)
{
return Task.FromResult(createQueue(endpoint));
}
}
}
2 changes: 1 addition & 1 deletion source/Halibut/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Halibut
{
public class DataStream : IEquatable<DataStream>, IDataStreamInternal
{
readonly Func<Stream, CancellationToken, Task> writerAsync;
protected readonly Func<Stream, CancellationToken, Task> writerAsync;
IDataStreamReceiver? receiver;

[JsonConstructor]
Expand Down
10 changes: 7 additions & 3 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HalibutRuntime : IHalibutRuntime
readonly IConnectionsObserver connectionsObserver;
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
readonly IControlMessageObserver controlMessageObserver;
readonly ISubscribersObserver subscribersObserver;

internal HalibutRuntime(
IServiceFactory serviceFactory,
Expand All @@ -59,7 +60,8 @@ internal HalibutRuntime(
IStreamFactory streamFactory,
IRpcObserver rpcObserver,
IConnectionsObserver connectionsObserver,
IControlMessageObserver controlMessageObserver)
IControlMessageObserver controlMessageObserver,
ISubscribersObserver subscribersObserver)
{
this.serverCertificate = serverCertificate;
this.trustProvider = trustProvider;
Expand All @@ -74,6 +76,7 @@ internal HalibutRuntime(
TimeoutsAndLimits = halibutTimeoutsAndLimits;
this.connectionsObserver = connectionsObserver;
this.controlMessageObserver = controlMessageObserver;
this.subscribersObserver = subscribersObserver;

connectionManager = new ConnectionManagerAsync();
this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory);
Expand Down Expand Up @@ -106,7 +109,7 @@ public int Listen(int port)

ExchangeProtocolBuilder ExchangeProtocolBuilder()
{
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log);
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, subscribersObserver, log);
}

public int Listen(IPEndPoint endpoint)
Expand All @@ -122,7 +125,8 @@ public int Listen(IPEndPoint endpoint)
HandleUnauthorizedClientConnect,
TimeoutsAndLimits,
streamFactory,
connectionsObserver);
connectionsObserver,
subscribersObserver);

listeners.DoWithExclusiveAccess(l =>
{
Expand Down
11 changes: 10 additions & 1 deletion source/Halibut/HalibutRuntimeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class HalibutRuntimeBuilder
IRpcObserver? rpcObserver;
IConnectionsObserver? connectionsObserver;
IControlMessageObserver? controlMessageObserver;
ISubscribersObserver? identityObserver;

public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver)
{
Expand Down Expand Up @@ -125,6 +126,12 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver)
return this;
}

public HalibutRuntimeBuilder WithSubscribersObserver(ISubscribersObserver subscribersObserver)
{
this.identityObserver = subscribersObserver;
return this;
}

public HalibutRuntime Build()
{
var halibutTimeoutsAndLimits = this.halibutTimeoutsAndLimits;
Expand Down Expand Up @@ -157,6 +164,7 @@ public HalibutRuntime Build()
var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance;
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
var identityObserver = this.identityObserver ?? NoSubscribersObserver.Instance;

var halibutRuntime = new HalibutRuntime(
serviceFactory,
Expand All @@ -171,7 +179,8 @@ public HalibutRuntime Build()
streamFactory,
rpcObserver,
connectionsObserver,
controlMessageObserver);
controlMessageObserver,
identityObserver);

if (onUnauthorizedClientConnect is not null)
{
Expand Down
3 changes: 3 additions & 0 deletions source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Halibut.ServiceModel
{
public interface IPendingRequestQueueFactory
{
IPendingRequestQueue CreateQueue(Uri endpoint);
Task<IPendingRequestQueue> CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut.Diagnostics;

namespace Halibut.ServiceModel
Expand All @@ -18,5 +20,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return new PendingRequestQueueAsync(halibutTimeoutsAndLimits, logFactory.ForEndpoint(endpoint));
}

public Task<IPendingRequestQueue> CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken)
{
return Task.FromResult(CreateQueue(endpoint));
}
}
}
25 changes: 25 additions & 0 deletions source/Halibut/Transport/Observability/ISubscribersObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using Halibut.Transport.Protocol;

namespace Halibut.Transport.Observability
{
public interface ISubscribersObserver
{
void SubscriberJoined(Uri subscriptionId);
void SubscriberLeft(Uri subscriptionId);
}
}
32 changes: 32 additions & 0 deletions source/Halibut/Transport/Observability/NoSubscribersObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using Halibut.Transport.Protocol;

namespace Halibut.Transport.Observability
{
public class NoSubscribersObserver : ISubscribersObserver
{
static NoSubscribersObserver? singleInstance;
public static NoSubscribersObserver Instance => singleInstance ??= new NoSubscribersObserver();
public void SubscriberJoined(Uri subscriptionId)
{
}

public void SubscriberLeft(Uri subscriptionId)
{
}
}
}
23 changes: 19 additions & 4 deletions source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Halibut.Diagnostics;
using Halibut.Exceptions;
using Halibut.ServiceModel;
using Halibut.Transport.Observability;

namespace Halibut.Transport.Protocol
{
Expand All @@ -20,15 +21,21 @@ public class MessageExchangeProtocol
readonly IMessageExchangeStream stream;
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
readonly ISubscribersObserver subscribersObserver;
readonly ILog log;
bool identified;
volatile bool acceptClientRequests = true;

public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log)
public MessageExchangeProtocol(IMessageExchangeStream stream,
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter,
ISubscribersObserver subscribersObserver,
ILog log)
{
this.stream = stream;
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
this.subscribersObserver = subscribersObserver;
this.log = log;
}

Expand Down Expand Up @@ -125,9 +132,17 @@ public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessag
await ProcessClientRequestsAsync(incomingRequestProcessor, cancellationToken);
break;
case RemoteIdentityType.Subscriber:
var pendingRequestQueue = pendingRequests(identity);
await ProcessSubscriberAsync(pendingRequestQueue, cancellationToken);
break;
try
{
subscribersObserver.SubscriberJoined(identity.SubscriptionId);
var pendingRequestQueue = pendingRequests(identity);
await ProcessSubscriberAsync(pendingRequestQueue, cancellationToken);
break;
}
finally
{
subscribersObserver.SubscriberLeft(identity.SubscriptionId);
}
default:
log.Write(EventType.ErrorInIdentify, $"Remote with identify {identity.SubscriptionId} identified itself with an unknown identity type {identity.IdentityType}");
throw new ProtocolException("Unexpected remote identity: " + identity.IdentityType);
Expand Down
5 changes: 4 additions & 1 deletion source/Halibut/Transport/SecureListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SecureListener : IAsyncDisposable
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
readonly IStreamFactory streamFactory;
readonly IConnectionsObserver connectionsObserver;
readonly ISubscribersObserver subscribersObserver;
ILog log;
TcpListener listener;
Thread? backgroundThread;
Expand All @@ -67,7 +68,8 @@ public SecureListener(
Func<string, string, UnauthorizedClientConnectResponse> unauthorizedClientConnect,
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
IStreamFactory streamFactory,
IConnectionsObserver connectionsObserver)
IConnectionsObserver connectionsObserver,
ISubscribersObserver subscribersObserver)
{
this.endPoint = endPoint;
this.serverCertificate = serverCertificate;
Expand All @@ -81,6 +83,7 @@ public SecureListener(
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
this.streamFactory = streamFactory;
this.connectionsObserver = connectionsObserver;
this.subscribersObserver = subscribersObserver;
this.cts = new CancellationTokenSource();
this.cancellationToken = cts.Token;

Expand Down