Pranay Rana: Publisher/Subscriber pattern with Event/Delegate and EventAggregator

Sunday, January 18, 2015

Publisher/Subscriber pattern with Event/Delegate and EventAggregator

Publisher/Subscriber pattern

Publisher/Subscriber pattern is one of the variations of the Observer designer pattern introduced by GOF in software development. In Publisher/Subscriber pattern publisher (entry responsible for publishing message) publish message and there are one or more Subscriber (entity who subscribe (i.e. interested in message) to particular message type)  who capture published message. Below image describe the scenario of publisher and subscriber pattern where Publisher publish two type of message (MessageA and MessageB) and Subscribers of the message receive the message in which they subscribed (Subscriber1 capture MessageA and Subscriber2 & Subscriber3 capture MessageB).


To understand this consider Real life senario where Mobile operators sending message to their customers.

As in above image Mobile operator publisher(broadcast) message (Message of Cricket Score and Message of Latest News) and message capture by the customer cells who subscribed for the message(Customer1 & Customer2 capture cricket score message and Customer3 & Customer4 capture latest news message).

Implementation with Event

Oneway to achieve Publisher/Subscriber pattern in application is make use of Event/Delegate i.e. with the help of framework. Below is detail description of publisher/subscriber implementation

Message - Below is class which represent message that is publish by Publisher and capture by interested Subscriber.

public class MessageArgument<T> : EventArgs
    {
        public T Message { get;  private set; }
        public MessageArgument(T message)
        {
            Message = message;
        }
    }
In technical term MessageArgument is generic class so instance of this class can be of any type which is represent by T template type.

Publisher  - as already described above in definition, Publisher is responsible for publish message of diffrent types.
 public interface IPublisher<T>
    {
        event EventHandler<MessageArgument<T>> DataPublisher;
        //void OnDataPublisher(MessageArgument<T> args);
        void PublishData(T data);
    }

    public class Publisher<T> : IPublisher<T>
    {
        //Defined datapublisher event
        public event EventHandler<MessageArgument<T>> DataPublisher;

        private void OnDataPublisher(MessageArgument<T> args)
        {
            var handler = DataPublisher;
            if (handler != null)
                handler(this, args);
        }


        public void PublishData(T data)
        {
            MessageArgument<T> message = (MessageArgument<T>)Activator.CreateInstance(typeof(MessageArgument<T>), new object[] { data });
            OnDataPublisher(message);
        }
    }
Technically Publisher is generic class which inherit IPublisher interface, as Publisher is generic class instance of it can be of any type which is represented by T template type. Publisher Instance of created of selected given type publish on that type of message only. For publishing different type message it requires to create  different type of Publisher Instance. To understand better read below how to use publisher in client class code.

Publisher Class provide event DataPublisher , subscriber attach themselves to this event to listen the message.

PublishDta is publisher class method that publish data to Subscribers.

Subscriber - Subscriber captures message of the type it interested in.
 public class Subscriber<T>
    {
        public IPublisher<T> Publisher { get; private set; }
        public Subscriber(IPublisher<T> publisher)
        {
            Publisher = publisher;
        }
    }

Technically Subscriber is generic class allows to create multiple instance of subscriber and each subscriber subscribe to message it interested in using publisher.

Subscriber pass instance of particular type publisher to capture message publisher by that Publisher.

How it works
 public class Client
    {
        private readonly IPublisher<int> IntPublisher;
        private readonly Subscriber<int> IntSublisher1;
        private readonly Subscriber<int> IntSublisher2;

        public Client()
        {
            IntPublisher = new Publisher<int>();//create publisher of type integer

            IntSublisher1 = new Subscriber<int>(IntPublisher);//subscriber 1 subscribe to integer publisher
            IntSublisher1.Publisher.DataPublisher += publisher_DataPublisher1;//event method to listen publish data

            IntSublisher2 = new Subscriber<int>(IntPublisher);//subscriber 2 subscribe to interger publisher
            IntSublisher2.Publisher.DataPublisher += publisher_DataPublisher2;//event method to listen publish data

            IntPublisher.PublishData(10); // publisher publish message
        }

        void publisher_DataPublisher1(object sender, MessageArgument<int> e)
        {
            Console.WriteLine("Subscriber 1 : " + e.Message);
        }

        void publisher_DataPublisher2(object sender, MessageArgument<int> e)
        {
            Console.WriteLine("Subscriber 2 : " + e.Message);
        }
    }
As you can see in the above code Client is class which create publisher and subscriber. Client class creates Publisher of integer type (in practice you can create any time of publisher) and creates two Subscriber classes which subscribe to the publisher message by attaching DataPublisher event which is provide by Publisher class.

So when you care instance of Client class you will receive following output



So as per the output Publisher of integer type published message "10" and two Subscriber who subscribed to publisher capture and displays message to output.

In practical scenario i.e. in actual application, one need to create all publisher at application start point i.e. at entrypoint of app and pass instance of publisher when creating subscriber.

For Example in Windows application create publisher in Main() Method , in Web Application create publisher in Appication_Start method of Global.asax or make use of Dependency Injection in which you register your publisher and use container to create when needed.
Once created you can pass the publisher in Subscriber as done in above client class code.

Implementation with EventAggregator

EventAggregator - by name one can easily say that it aggregate events. In Publisher/Subscriber EventAggregator is woks as HUB whose task is to aggregate all publish message and send message to interested subscribers.

As you can see in above image EventAggregator comes as HUB between publisher and Subscriber, it works like this
  1. Publisher publish message
  2. EventAggregator Receives message send by publishers
  3. EventAggregator get list of all subscriber interested message
  4. EventAgregator sends message to interested subscriber
EventAggregator Implementation

Subscription - It class which used to create subscription token. When Subscriber subscribe to interested message type via EventAggregator return Subscription token which further used by Subscriber to keep track of its subscription.

 //Does used by EventAggregator to reserve subscription
    public class Subscription<Tmessage> : IDisposable
    {
        public readonly MethodInfo MethodInfo;
        private readonly EventAggregator EventAggregator;
        public readonly WeakReference TargetObjet;
        public readonly bool IsStatic;

        private bool isDisposed;
        public Subscription(Action<Tmessage> action, EventAggregator eventAggregator)
        {
            MethodInfo = action.Method;
            if (action.Target == null)
                IsStatic = true;
            TargetObjet = new WeakReference(action.Target);
            EventAggregator = eventAggregator;
        }

        ~Subscription()
        {
            if (!isDisposed)
                Dispose();
        }

        public void Dispose()
        {
            EventAggregator.UnSbscribe(this);
            isDisposed = true;
        }

        public Action<Tmessage> CreatAction()
        {
            if (TargetObjet.Target!=null && TargetObjet.IsAlive )
                return (Action<Tmessage>)Delegate.CreateDelegate(typeof(Action<Tmessage>), TargetObjet.Target, MethodInfo);
            if(this.IsStatic)
                return (Action<Tmessage>)Delegate.CreateDelegate(typeof(Action<Tmessage>), MethodInfo);

            return null;
        }
    }
EventAggregator -

public class EventAggregator
    {
        private readonly object lockObj = new object();
        private Dictionary<Type, IList> subscriber;

        public EventAggregator()
        {
            subscriber = new Dictionary<Type, IList>();
        }

        public void Publish<TMessageType>(TMessageType message)
        {
            Type t = typeof(TMessageType);
            IList sublst;
            if (subscriber.ContainsKey(t))
            {
                lock (lockObj)
                {
                    sublst = new List<Subscription<TMessageType>>(subscriber[t].Cast<Subscription<TMessageType>>());
                }

                foreach (Subscription<TMessageType> sub in sublst)
                {
                    var action = sub.CreatAction();
                    if (action != null)
                        action(message);
                }
            }
        }

        public Subscription<TMessageType> Subscribe<TMessageType>(Action<TMessageType> action)
        {
            Type t = typeof(TMessageType);
            IList actionlst;
            var actiondetail = new Subscription<TMessageType>(action, this);

            lock (lockObj)
            {
                if (!subscriber.TryGetValue(t, out actionlst))
                {
                    actionlst = new List<Subscription<TMessageType>>();
                    actionlst.Add(actiondetail);
                    subscriber.Add(t, actionlst);
                }
                else
                {
                    actionlst.Add(actiondetail);
                }
            }

            return actiondetail;
        }

        public void UnSbscribe<TMessageType>(Subscription<TMessageType> subscription)
        {
            Type t = typeof(TMessageType);
            if (subscriber.ContainsKey(t))
            {
                lock (lockObj)
                {
                    subscriber[t].Remove(subscription);
                }
                subscription = null;
            }
        }

    }
In above code
Dictionary<Type, IList> subscriber - is dictionary in which Type is Type of message and IList is list of action. So it holds list of action mapped to particular Message Type.

public void Publish<TMessageType>(TMessageType message) - is method used to publishing message. As in code this method does receives message as input than it filter out list of all subscriber by message type and publish message to Subscriber.

public Subscription<TMessageType> Subscribe<TMessageType>(Action<TMessageType> action) - is method used to subscribe interested message type. As in code this method receives Action delegate as input. It maps Action to particular MessageType, i.e. it create entry for message type if not present in dictionary and maps Subscription object  (which waps Action) to message entry.

public void UnSbscribe<TMessageType>(Subscription<TMessageType> subscription) - is method used to unsubscribe form particular message type. It receives Subscription object as input and remove object from the dictionary.

How To Use

      static void Main(string[] args)
        {
            EventAggregator eve = new EventAggregator();
            Publisher pub = new Publisher(eve);
            Subscriber sub = new Subscriber(eve);

            pub.PublishMessage();

            Console.ReadLine();

        }

As in above code shows it first create instance of EventAggregator which pass as argument to publisher and subscriber, than publisher publish message.

Publisher - Code of Publisher class, which shows how publisher publish message using EventAggregator

 public class Publisher
    {
        EventAggregator EventAggregator;
        public Publisher(EventAggregator eventAggregator)
        {
            EventAggregator = eventAggregator;
        }

        public void PublishMessage()
        {
            EventAggregator.Publish(new Mymessage());
            EventAggregator.Publish(10);
        }
    }

Subscriber - Code of Subscriber class, which shows subscriber subscribe to messages which it interested in using EventAggregator.

    public class Subscriber
    {
        Subscription<Mymessage> myMessageToken;
        Subscription<int> intToken;
        EventAggregator eventAggregator;

        public Subscriber(EventAggregator eve)
        {
            eventAggregator = eve;
            eve.Subscribe<Mymessage>(this.Test);
            eve.Subscribe<int>(this.IntTest);
        }

        private void IntTest(int obj)
        {
            Console.WriteLine(obj);
            eventAggregator.UnSbscribe(intToken);
        }

        private void Test(Mymessage test)
        {
            Console.WriteLine(test.ToString());
            eventAggregator.UnSbscribe(myMessageToken);
        }
    }

Output


Note:
In practical scenario i.e. in actual application, one need to create EventAggregator at application start point i.e. at entrypoint of app and pass instance of publisher and subscriber.

For Example in Windows application create EventAggregator in Main() Method as in above example code, in Web Application create publisher in Appication_Start method of Global.asax or make use of Dependency Injection in which you register your publisher and use container to create when needed.

Event/Delegate Vs. EventAggregator

Difference between Event/Delegate and EventAggregator is
Event/Delegate EventAggregator
For publishing Different type of message there is need of creating different type of Publisher
For Example : Publisher - for integer type publisher Publisher - for string type publisher So the if class want to publish different type of messages it require to consume or create different type of publisher.
To publish integer abd string type message publisher class will be Publisher (Publisher intPublisher, Publisher strPublisher).
As EventAggregator works as HUB there is no need create more than one instance of Eventaggregator and publisher just need to consume it. Example: Publisher(EventAggregator eventAggregator)
Tight Coupling bettwen Publisher and Subscriber. In this pattern Subscriber require to know publisher as they subscribe to event of publisher. Loose Coupling between Publisher and Subscriber. In this pattern as EventAggregator is mediator Publisher and Subscriber don’t know each other they just need to know EventAggregator.

Conclusion

It's my Point of view , Event/Delegate is easy to implement and good for the small project or in project where there are less number of Publisher/Subscriber. EventAggregator is suitable for the large project or project which is having large number of Publisher/Subscirber.

But I think its always good to use EventAggregator because its offer loose coupling.

NOTE
It's my point of view please do comment on this and provide your feedback. Thanks to Sacha Barber to helping in update.

1 comment: