diff --git a/src/TinyMessenger/TinyMessenger.Tests/TestData/TinyMessengerTestData.cs b/src/TinyMessenger/TinyMessenger.Tests/TestData/TinyMessengerTestData.cs index 2f8cbde..a7e0dc5 100644 --- a/src/TinyMessenger/TinyMessenger.Tests/TestData/TinyMessengerTestData.cs +++ b/src/TinyMessenger/TinyMessenger.Tests/TestData/TinyMessengerTestData.cs @@ -32,14 +32,20 @@ public interface ITestMessageInterface : ITinyMessage public class InterfaceDerivedMessage : ITestMessageInterface { public object Sender { get; private set; } + public bool Consume { get; private set; } public TThings Things { get; set; } - public InterfaceDerivedMessage(object sender) + public InterfaceDerivedMessage(object sender) : this(sender, false) + { + } + + public InterfaceDerivedMessage(object sender, bool consume) { this.Sender = sender; + this.Consume = consume; } -} + } public class TestProxy : ITinyMessageProxy { diff --git a/src/TinyMessenger/TinyMessenger.Tests/TinyMessageSubscriptionTokenTests.cs b/src/TinyMessenger/TinyMessenger.Tests/TinyMessageSubscriptionTokenTests.cs index 5c1b170..7608444 100644 --- a/src/TinyMessenger/TinyMessenger.Tests/TinyMessageSubscriptionTokenTests.cs +++ b/src/TinyMessenger/TinyMessenger.Tests/TinyMessageSubscriptionTokenTests.cs @@ -15,11 +15,12 @@ public class TinyMessageSubscriptionTokenTests public void Dispose_WithValidHubReference_UnregistersWithHub() { var messengerMock = new Moq.Mock(); + messengerMock.Setup((messenger) => messenger.Unsubscribe(Moq.It.IsAny())).Verifiable(); + var token = new TinyMessageSubscriptionToken(messengerMock.Object, typeof(TestMessage)); token.Dispose(); - messengerMock.VerifyAll(); } @@ -27,6 +28,7 @@ public void Dispose_WithValidHubReference_UnregistersWithHub() public void Dispose_WithInvalidHubReference_DoesNotThrow() { var token = UtilityMethods.GetTokenWithOutOfScopeMessenger(); + GC.Collect(); GC.WaitForFullGCComplete(2000); @@ -38,7 +40,6 @@ public void Dispose_WithInvalidHubReference_DoesNotThrow() public void Ctor_NullHub_ThrowsArgumentNullException() { var messenger = UtilityMethods.GetMessenger(); - var token = new TinyMessageSubscriptionToken(null, typeof(ITinyMessage)); } @@ -47,7 +48,6 @@ public void Ctor_NullHub_ThrowsArgumentNullException() public void Ctor_InvalidMessageType_ThrowsArgumentOutOfRangeException() { var messenger = UtilityMethods.GetMessenger(); - var token = new TinyMessageSubscriptionToken(messenger, typeof(object)); } @@ -55,7 +55,6 @@ public void Ctor_InvalidMessageType_ThrowsArgumentOutOfRangeException() public void Ctor_ValidHubAndMessageType_DoesNotThrow() { var messenger = UtilityMethods.GetMessenger(); - var token = new TinyMessageSubscriptionToken(messenger, typeof(TestMessage)); } } diff --git a/src/TinyMessenger/TinyMessenger/TinyMessenger.cs b/src/TinyMessenger/TinyMessenger/TinyMessenger.cs index 96c6b9a..c81cb26 100644 --- a/src/TinyMessenger/TinyMessenger/TinyMessenger.cs +++ b/src/TinyMessenger/TinyMessenger/TinyMessenger.cs @@ -30,8 +30,7 @@ public class DefaultSubscriberErrorHandler : ISubscriberErrorHandler { public void Handle(ITinyMessage message, Exception exception) { - //default behaviour is to do nothing - + // Default behaviour is to do nothing } } @@ -44,6 +43,11 @@ public interface ITinyMessage /// The sender of the message, or null if not supported by the message implementation. /// object Sender { get; } + + /// + /// Get if the message should be completely consumed by the first subscriber to process it. + /// + bool Consume { get; } } /// @@ -55,25 +59,42 @@ public abstract class TinyMessageBase : ITinyMessage /// Store a WeakReference to the sender just in case anyone is daft enough to /// keep the message around and prevent the sender from being collected. /// - private WeakReference _Sender; + private WeakReference _sender; + public object Sender { get { - return (_Sender == null) ? null : _Sender.Target; + return _sender == null ? null : _sender.Target; } } + /// + /// Should the message be consumed completely by the subscriber. + /// + public bool Consume { get; } + /// /// Initializes a new instance of the MessageBase class. /// /// Message sender (usually "this") - public TinyMessageBase(object sender) + public TinyMessageBase(object sender) : this(sender, false) + { + } + + /// + /// Initializes a new instance of the MessageBase class. + /// + /// Message sender (usually "this") + /// Should the message be consumed by the subscriber + public TinyMessageBase(object sender, bool consume) { if (sender == null) throw new ArgumentNullException("sender"); - _Sender = new WeakReference(sender); + _sender = new WeakReference(sender); + + Consume = consume; } } @@ -93,8 +114,18 @@ public class GenericTinyMessage : TinyMessageBase /// /// Message sender (usually "this") /// Contents of the message - public GenericTinyMessage(object sender, TContent content) - : base(sender) + public GenericTinyMessage(object sender, TContent content) : base(sender) + { + Content = content; + } + + /// + /// Create a new instance of the GenericTinyMessage class. + /// + /// Message sender (usually "this") + /// Should the message be consumed by the subscriber + /// Contents of the message + public GenericTinyMessage(object sender, bool consume, TContent content) : base(sender, consume) { Content = content; } @@ -122,8 +153,7 @@ public class CancellableGenericTinyMessage : TinyMessageBase /// Message sender (usually "this") /// Contents of the message /// Action to call for cancellation - public CancellableGenericTinyMessage(object sender, TContent content, Action cancelAction) - : base(sender) + public CancellableGenericTinyMessage(object sender, TContent content, Action cancelAction) : base(sender) { if (cancelAction == null) throw new ArgumentNullException("cancelAction"); @@ -138,8 +168,8 @@ public CancellableGenericTinyMessage(object sender, TContent content, Action can /// public sealed class TinyMessageSubscriptionToken : IDisposable { - private WeakReference _Hub; - private Type _MessageType; + private WeakReference _hub; + private Type _messageType; /// /// Initializes a new instance of the TinyMessageSubscriptionToken class. @@ -152,21 +182,35 @@ public TinyMessageSubscriptionToken(ITinyMessengerHub hub, Type messageType) if (!typeof(ITinyMessage).IsAssignableFrom(messageType)) throw new ArgumentOutOfRangeException("messageType"); - _Hub = new WeakReference(hub); - _MessageType = messageType; + _hub = new WeakReference(hub); + _messageType = messageType; } public void Dispose() { - if (_Hub.IsAlive) + if (_hub.IsAlive) { - var hub = _Hub.Target as ITinyMessengerHub; + var hub = _hub.Target as ITinyMessengerHub; if (hub != null) { - var unsubscribeMethod = typeof(ITinyMessengerHub).GetMethod("Unsubscribe", new Type[] {typeof(TinyMessageSubscriptionToken)}); - unsubscribeMethod = unsubscribeMethod.MakeGenericMethod(_MessageType); - unsubscribeMethod.Invoke(hub, new object[] {this}); + var unsubscribeMethods = typeof(ITinyMessengerHub).GetMethods() + .Where(method => + { + if (!method.IsGenericMethod && method.Name != "Unsubscribe") + return false; + + var parameters = method.GetParameters(); + + if (parameters.Length != 1 || !(typeof(TinyMessageSubscriptionToken).Equals(parameters[0].ParameterType))) + return false; + + return true; + }); + var unsubscribeMethod = unsubscribeMethods.First(); + + unsubscribeMethod = unsubscribeMethod.MakeGenericMethod(_messageType); + unsubscribeMethod.Invoke(hub, new object[] { this }); } } @@ -216,7 +260,7 @@ public interface ITinyMessageProxy /// public sealed class DefaultTinyMessageProxy : ITinyMessageProxy { - private static readonly DefaultTinyMessageProxy _Instance = new DefaultTinyMessageProxy(); + private static readonly DefaultTinyMessageProxy _instance = new DefaultTinyMessageProxy(); static DefaultTinyMessageProxy() { @@ -229,7 +273,7 @@ public static DefaultTinyMessageProxy Instance { get { - return _Instance; + return _instance; } } @@ -242,9 +286,11 @@ public void Deliver(ITinyMessage message, ITinyMessageSubscription subscription) subscription.Deliver(message); } } + #endregion #region Exceptions + /// /// Thrown when an exceptions occurs while subscribing to a message type /// @@ -264,9 +310,11 @@ public TinyMessengerSubscriptionException(Type messageType, string reason, Excep } } + #endregion #region Hub Interface + /// /// Messenger hub responsible for taking subscriptions/publications and delivering of messages. /// @@ -409,40 +457,46 @@ public interface ITinyMessengerHub /// AsyncCallback called on completion void PublishAsync(TMessage message, AsyncCallback callback) where TMessage : class, ITinyMessage; } + #endregion #region Hub Implementation + /// /// Messenger hub responsible for taking subscriptions/publications and delivering of messages. /// public sealed class TinyMessengerHub : ITinyMessengerHub { - readonly ISubscriberErrorHandler _SubscriberErrorHandler; + private readonly ISubscriberErrorHandler _subscriberErrorHandler; #region ctor methods public TinyMessengerHub() { - _SubscriberErrorHandler = new DefaultSubscriberErrorHandler(); + _subscriberErrorHandler = new DefaultSubscriberErrorHandler(); } public TinyMessengerHub(ISubscriberErrorHandler subscriberErrorHandler) { - _SubscriberErrorHandler = subscriberErrorHandler; + _subscriberErrorHandler = subscriberErrorHandler; } + #endregion #region Private Types and Interfaces - private class WeakTinyMessageSubscription : ITinyMessageSubscription - where TMessage : class, ITinyMessage + + private class WeakTinyMessageSubscription : ITinyMessageSubscription where TMessage : class, ITinyMessage { - protected TinyMessageSubscriptionToken _SubscriptionToken; - protected WeakReference _DeliveryAction; - protected WeakReference _MessageFilter; + protected TinyMessageSubscriptionToken _subscriptionToken; + protected WeakReference _deliveryAction; + protected WeakReference _messageFilter; public TinyMessageSubscriptionToken SubscriptionToken { - get { return _SubscriptionToken; } + get + { + return _subscriptionToken; + } } public bool ShouldAttemptDelivery(ITinyMessage message) @@ -453,13 +507,13 @@ public bool ShouldAttemptDelivery(ITinyMessage message) if (!(typeof(TMessage).IsAssignableFrom(message.GetType()))) return false; - if (!_DeliveryAction.IsAlive) + if (!_deliveryAction.IsAlive) return false; - if (!_MessageFilter.IsAlive) + if (!_messageFilter.IsAlive) return false; - return ((Func)_MessageFilter.Target).Invoke(message as TMessage); + return ((Func)_messageFilter.Target).Invoke(message as TMessage); } public void Deliver(ITinyMessage message) @@ -467,10 +521,10 @@ public void Deliver(ITinyMessage message) if (!(message is TMessage)) throw new ArgumentException("Message is not the correct type"); - if (!_DeliveryAction.IsAlive) + if (!_deliveryAction.IsAlive) return; - ((Action)_DeliveryAction.Target).Invoke(message as TMessage); + ((Action)_deliveryAction.Target).Invoke(message as TMessage); } /// @@ -490,22 +544,24 @@ public WeakTinyMessageSubscription(TinyMessageSubscriptionToken subscriptionToke if (messageFilter == null) throw new ArgumentNullException("messageFilter"); - _SubscriptionToken = subscriptionToken; - _DeliveryAction = new WeakReference(deliveryAction); - _MessageFilter = new WeakReference(messageFilter); + _subscriptionToken = subscriptionToken; + _deliveryAction = new WeakReference(deliveryAction); + _messageFilter = new WeakReference(messageFilter); } } - private class StrongTinyMessageSubscription : ITinyMessageSubscription - where TMessage : class, ITinyMessage + private class StrongTinyMessageSubscription : ITinyMessageSubscription where TMessage : class, ITinyMessage { - protected TinyMessageSubscriptionToken _SubscriptionToken; - protected Action _DeliveryAction; - protected Func _MessageFilter; + protected TinyMessageSubscriptionToken _subscriptionToken; + protected Action _deliveryAction; + protected Func _messageFilter; public TinyMessageSubscriptionToken SubscriptionToken { - get { return _SubscriptionToken; } + get + { + return _subscriptionToken; + } } public bool ShouldAttemptDelivery(ITinyMessage message) @@ -516,7 +572,7 @@ public bool ShouldAttemptDelivery(ITinyMessage message) if (!(typeof(TMessage).IsAssignableFrom(message.GetType()))) return false; - return _MessageFilter.Invoke(message as TMessage); + return _messageFilter.Invoke(message as TMessage); } public void Deliver(ITinyMessage message) @@ -524,7 +580,7 @@ public void Deliver(ITinyMessage message) if (!(message is TMessage)) throw new ArgumentException("Message is not the correct type"); - _DeliveryAction.Invoke(message as TMessage); + _deliveryAction.Invoke(message as TMessage); } /// @@ -544,14 +600,16 @@ public StrongTinyMessageSubscription(TinyMessageSubscriptionToken subscriptionTo if (messageFilter == null) throw new ArgumentNullException("messageFilter"); - _SubscriptionToken = subscriptionToken; - _DeliveryAction = deliveryAction; - _MessageFilter = messageFilter; + _subscriptionToken = subscriptionToken; + _deliveryAction = deliveryAction; + _messageFilter = messageFilter; } } + #endregion #region Subscription dictionary + private class SubscriptionItem { public ITinyMessageProxy Proxy { get; private set; } @@ -564,11 +622,13 @@ public SubscriptionItem(ITinyMessageProxy proxy, ITinyMessageSubscription subscr } } - private readonly object _SubscriptionsPadlock = new object(); - private readonly List _Subscriptions = new List(); + private readonly object _subscriptionsPadlock = new object(); + private readonly List _subscriptions = new List(); + #endregion #region Public API + /// /// Subscribe to a message type with the given destination and delivery action. /// All references are held with strong references @@ -744,11 +804,13 @@ public void PublishAsync(TMessage message, AsyncCallback callback) whe { PublishAsyncInternal(message, callback); } + #endregion #region Internal Methods + private TinyMessageSubscriptionToken AddSubscriptionInternal(Action deliveryAction, Func messageFilter, bool strongReference, ITinyMessageProxy proxy) - where TMessage : class, ITinyMessage + where TMessage : class, ITinyMessage { if (deliveryAction == null) throw new ArgumentNullException("deliveryAction"); @@ -759,17 +821,17 @@ private TinyMessageSubscriptionToken AddSubscriptionInternal(Action(subscriptionToken, deliveryAction, messageFilter); else subscription = new WeakTinyMessageSubscription(subscriptionToken, deliveryAction, messageFilter); - _Subscriptions.Add(new SubscriptionItem(proxy, subscription)); + _subscriptions.Add(new SubscriptionItem(proxy, subscription)); return subscriptionToken; } @@ -781,30 +843,37 @@ private void RemoveSubscriptionInternal(TinyMessageSubscriptionToken s if (subscriptionToken == null) throw new ArgumentNullException("subscriptionToken"); - lock (_SubscriptionsPadlock) + lock (_subscriptionsPadlock) { - var currentlySubscribed = (from sub in _Subscriptions - where object.ReferenceEquals(sub.Subscription.SubscriptionToken, subscriptionToken) - select sub).ToList(); + var currentlySubscribed = ( + from sub in _subscriptions + where object.ReferenceEquals(sub.Subscription.SubscriptionToken, subscriptionToken) + select sub + ).ToList(); - currentlySubscribed.ForEach(sub => _Subscriptions.Remove(sub)); + currentlySubscribed.ForEach(sub => _subscriptions.Remove(sub)); } } - private void PublishInternal(TMessage message) - where TMessage : class, ITinyMessage + private void PublishInternal(TMessage message) where TMessage : class, ITinyMessage { if (message == null) throw new ArgumentNullException("message"); List currentlySubscribed; - lock (_SubscriptionsPadlock) + + lock (_subscriptionsPadlock) { - currentlySubscribed = (from sub in _Subscriptions - where sub.Subscription.ShouldAttemptDelivery(message) - select sub).ToList(); + currentlySubscribed = ( + from sub in _subscriptions + where sub.Subscription.ShouldAttemptDelivery(message) + select sub + ).ToList(); } + if (message.Consume && currentlySubscribed.Count > 1) + currentlySubscribed = currentlySubscribed.Take(1).ToList(); + currentlySubscribed.ForEach(sub => { try @@ -814,18 +883,23 @@ where sub.Subscription.ShouldAttemptDelivery(message) catch (Exception exception) { // By default ignore any errors and carry on - _SubscriberErrorHandler.Handle(message, exception); + _subscriberErrorHandler.Handle(message, exception); } }); } private void PublishAsyncInternal(TMessage message, AsyncCallback callback) where TMessage : class, ITinyMessage { - Action publishAction = () => { PublishInternal(message); }; + Action publishAction = () => + { + PublishInternal(message); + }; publishAction.BeginInvoke(callback, null); } + #endregion } + #endregion }