EventHelix.com: CASE Tools; Real-time and Embedded System Design; Object Oriented Design
  Home  |  EventStudio System Designer 4.0  |  VisualEther Protocol Analyzer 1.0  Real-time Mantra  Contact Us
Home > Real-time Mantra > Design Patterns > Publish-Subscribe Design Patterns

Publish-Subscribe Design Patterns

Click here to open Hierarchical State Machine Documentation in a new window Local Publish-Subscriber Design Pattern Documentation
Click here to open Hierarchical State Machine Documentation in a new window Remote Publish-Subscriber Design Pattern Documentation

While developing embedded system, one frequently encounters a situation where many entities are interested in occurrence of a particular event. This introduces a strong coupling between the publisher and subscriber of this event change notification. Thus whenever a new entity needs the information, code for the publisher of the information also needs to be modified to accommodate the new request.

An Example

Consider the example of a system which manages terminals. Here many entities would be interested in knowing when a terminal's status changes. A few examples are:

  • Call Processing module needs to know when the terminal goes out of service so that calls on the terminal can be cleared.
  • Alarm processing software needs to know of terminal status change, so that alarms can be raised/cleared.
  • Resource manager would be interested in terminal status change as it needs to change the resource counts when terminal status changes.

As the design progresses, more and more applications would be interested in status change. Its easy to see that this complicates the design, as terminal status handling has to be modified on each occasion.

Publish-Subscribe Pattern

The Publish-Subscribe Pattern solves the tight coupling problem. Here the coupling is removed by the publisher of information supporting a generic interface for subscribers. Entities interested in the information subscribe to the publisher by registering for the information. With this interface, the publisher code does not need to be modified every time a subscriber is introduced.

Whenever information needs to be published, the publisher invokes the Publish method to inform all the subscribers.

This pattern comes in two flavors:

Local Publish-Subscribe Pattern

Here we develop the LocalStatusPublisher class. This class can be used as a helper class when this generic subscribe-publish interface is desired. The following important public methods are supported:

  • PublishStatus: Invoke this method to inform all registered subscribers about status change.
  • Register: Invoke this method to register an object for the status change notification.
  • Deregister: Invoke this method to deregister an object. The object will not receive any notifications of status change after it deregisters.

 
Local Status Publisher
00009 #include "Subscriber.h"
00010 #include <stdio.h>
00033 
00034 class LocalStatusPublisher
00035 {
00036     enum {
00038           NOT_FOUND = -1, 
00039 
00041           MAX_SUBSCRIBERS = 100
00042          };
00045 
00046     Subscriber *m_pSubscriber[MAX_SUBSCRIBERS];
00047     
00056 
00057     int Find(Subscriber *pSubscriber) const
00058     {
00059        int index = NOT_FOUND;
00060        for (int i=0; i < MAX_SUBSCRIBERS; i++)
00061        {
00062            if (m_pSubscriber[i] == pSubscriber)
00063            {
00064                index = i;
00065                break;
00066            }
00067        }       
00068        return index;     
00069     }
00070             
00071 public:
00072 
00074     enum OperationStatus {SUCCESS, FAILURE};
00075 
00084 
00085     void PublishStatus(int unitId, int status)
00086     {
00087        for (int i=0; i < MAX_SUBSCRIBERS; i++)
00088        {
00089            // A valid subscriber exists only when the pointer is non NULL
00090            if (m_pSubscriber[i])
00091            {
00092                m_pSubscriber[i]->HandleStatusChange(unitId, status);
00093            }
00094        }
00095     }     
00107 
00108     OperationStatus Register(Subscriber *pSubscriber)
00109     {
00110         OperationStatus status = FAILURE;
00111         // First check if the subscriber is already present
00112         // in the list
00113         int index = Find(pSubscriber);
00114                 
00115         if (index == NOT_FOUND)
00116         {
00117             // Subscriber was not found, so this is not a duplicate request
00118             // Now look for a free entry by finding NULL
00119             index = Find(NULL);
00120             if (index != NOT_FOUND)
00121             {
00122                // A free entry has been found
00123                m_pSubscriber[index] = pSubscriber;
00124                status = SUCCESS;
00125             }
00126         }      
00127         return status;
00128     }
00129     
00138     OperationStatus Deregister(Subscriber *pSubscriber)
00139     {
00140         OperationStatus status = FAILURE;
00141         // Search for the entry
00142         int index = Find(pSubscriber);
00143         if (index != NOT_FOUND)
00144         {
00145            // Free the entry by marking as NULL
00146            m_pSubscriber[index] = NULL;
00147            status = SUCCESS;
00148         }
00149         return SUCCESS;
00150     }
00151     
00152 };    

Remote Publish-Subscribe Pattern

Here we will look at the RemoteStatusPublisher class. This class supports a message based interface. Subscribers send registration request message to register for the status change. The source address of the sender is saved as a part of the registration process. Deregistration request is sent to stop receiving the status change notifications.

Whenever status change is detected, PublishStatus method is invoked. This method sends the status change message to all the registered subscribers using the address that was obtained during registration.

Remote Status Publisher
00009 #include "Subscriber.h"
00010 #include <stdio.h>
00011 
00013 typedef unsigned long SubscriberAddress;
00014 
00016 struct RegisterRequestMessage
00017 {
00019     int opcode;
00020 
00022     SubscriberAddress subscriberAddress;
00023 };
00024 
00026 struct DeregisterRequestMessage
00027 {
00029     int opcode;
00030 
00032     SubscriberAddress subscriberAddress;
00033 };
00034 
00036 enum OperationStatus {SUCCESS, FAILURE};
00037 
00039 enum AckType {REGISTRATION_ACK, DEREGISTRATION_ACK};
00040 
00063 class RemoteStatusPublisher
00064 {
00065     enum {
00066 
00068         FREE_ENTRY = 0,
00069 
00071         NOT_FOUND = -1, 
00072 
00074         MAX_SUBSCRIBERS = 100
00075     };
00076 
00079     SubscriberAddress m_subscriberAddress[MAX_SUBSCRIBERS];    
00086 
00087     int Find(SubscriberAddress subscriberAddress) const
00088     {
00089        int index = NOT_FOUND;
00090        for (int i=0; i < MAX_SUBSCRIBERS; i++)
00091        {
00092            if (m_subscriberAddress[i] == subscriberAddress)
00093            {
00094                index = i;
00095                break;
00096            }
00097        }       
00098        return index;     
00099     } 
00100      
00103     void SendRequestStatus(SubscriberAddress subscriberAddress, AckType ackType, 
00104                         OperationStatus registrationStatus);
00105 
00108     void SendStatusChange(SubscriberAddress subscriberAddress, int unitId, int unitStatus);
00109 
00110 public:
00111 
00118 
00119     void PublishStatus(int unitId, int unitStatus)
00120     {
00121        for (int i=0; i < MAX_SUBSCRIBERS; i++)
00122        {
00123            // A valid subscriber exists only when the pointer is non NULL
00124            if (m_subscriberAddress[i]!= FREE_ENTRY)
00125            {
00126                SendStatusChange(m_subscriberAddress[i], unitId, unitStatus);
00127            }
00128        }
00129     }    
00137 
00138     void HandleRegisterRequest(const RegisterRequestMessage *pMsg)
00139     {
00140         OperationStatus status = FAILURE;
00141         // First check if the subscriber is already present
00142         // in the list
00143         int index = Find(pMsg->subscriberAddress);
00144                 
00145         if (index == NOT_FOUND)
00146         {
00147             // Subscriber was not found, so this is not a duplicate request
00148             // Now look for a free entry by finding FREE_ENTRY
00149             index = Find(FREE_ENTRY);
00150             if (index != NOT_FOUND)
00151             {
00152                // A free entry has been found
00153                m_subscriberAddress[index] = pMsg->subscriberAddress;
00154                status = SUCCESS;
00155                SendRequestStatus(pMsg->subscriberAddress, REGISTRATION_ACK , status);
00156             }
00157         }      
00158     }
00159     
00165   void HandleDeregisterRequest(const DeregisterRequestMessage *pMsg)
00166     {
00167         OperationStatus status = FAILURE;
00168         // Search for the entry
00169         int index = Find(pMsg->subscriberAddress);
00170         if (index != NOT_FOUND)
00171         {
00172            // Free the entry by marking as FREE_ENTRY
00173            m_subscriberAddress[index] = FREE_ENTRY;
00174            status = SUCCESS;
00175         }
00176 
00177         // Inform the requester with an acknowledgement.
00178         SendRequestStatus(pMsg->subscriberAddress, DEREGISTRATION_ACK,status);
00179     }
00180     
00181 };    
 
Explore More ...
Click here to open Hierarchical State Machine Documentation in a new window Local Publish-Subscriber Design Pattern Documentation
Click here to open Hierarchical State Machine Documentation in a new window Remote Publish-Subscriber Design Pattern Documentation
  Home  |  EventStudio System Designer 4.0  |  VisualEther Protocol Analyzer 1.0  Real-time Mantra  Contact Us
Copyright © 2000-2008 EventHelix.com Inc. All Rights Reserved.