|
Home >
Real-time Mantra >
Design Patterns >
Publish-Subscribe Design Patterns
While developing embedded system, one frequently encounters a situation where
many entities are interested in occurrence of a particular event. This
introduces a strong coupling between the publisher and subscriber of this event
change notification. Thus whenever a new entity needs the information, code for
the publisher of the information also needs to be modified to accommodate the
new request.
An Example
Consider the example of a system which manages terminals. Here many entities
would be interested in knowing when a terminal's status changes. A few examples
are:
- Call Processing module needs to know when the terminal goes out of service
so that calls on the terminal can be cleared.
- Alarm processing software needs to know of terminal status change, so that
alarms can be raised/cleared.
- Resource manager would be interested in terminal status change as it needs
to change the resource counts when terminal status changes.
As the design progresses, more and more applications would be interested in
status change. Its easy to see that this complicates the design, as
terminal status handling has to be modified on each occasion.
Publish-Subscribe Pattern
The Publish-Subscribe Pattern solves the tight coupling problem. Here the
coupling is removed by the publisher of information supporting a generic
interface for subscribers. Entities interested in the information subscribe to
the publisher by registering for the information. With this interface, the
publisher code does not need to be modified every time a subscriber is
introduced.
Whenever information needs to be published, the publisher invokes the Publish
method to inform all the subscribers.
This pattern comes in two flavors:
Here we develop the LocalStatusPublisher class.
This class can be used as a helper class when this generic subscribe-publish
interface is desired. The following important public methods are supported:
- PublishStatus: Invoke this method to inform
all registered subscribers about status change.
- Register: Invoke this method to register an
object for the status change notification.
- Deregister:
Invoke this method to deregister an object. The object will not receive any
notifications of status change after it deregisters.
Here we will look at the RemoteStatusPublisher
class. This class supports a message based interface. Subscribers send
registration request message to register for the status change. The source
address of the sender is saved as a part of the registration process.
Deregistration request is sent to stop receiving the status change notifications.
Whenever status change is detected, PublishStatus
method is invoked. This method sends the status change message to all the
registered subscribers using the address that was obtained during registration.
| Remote
Status Publisher |
00009 #include "Subscriber.h"
00010 #include <stdio.h>
00011
00013 typedef unsigned long SubscriberAddress;
00014
00016 struct RegisterRequestMessage
00017 {
00019 int opcode;
00020
00022 SubscriberAddress subscriberAddress;
00023 };
00024
00026 struct DeregisterRequestMessage
00027 {
00029 int opcode;
00030
00032 SubscriberAddress subscriberAddress;
00033 };
00034
00036 enum OperationStatus {SUCCESS, FAILURE};
00037
00039 enum AckType {REGISTRATION_ACK, DEREGISTRATION_ACK};
00040
00063 class RemoteStatusPublisher
00064 {
00065 enum {
00066
00068 FREE_ENTRY = 0,
00069
00071 NOT_FOUND = -1,
00072
00074 MAX_SUBSCRIBERS = 100
00075 };
00076
00079 SubscriberAddress m_subscriberAddress[MAX_SUBSCRIBERS];
00086
00087 int Find(SubscriberAddress subscriberAddress) const
00088 {
00089 int index = NOT_FOUND;
00090 for (int i=0; i < MAX_SUBSCRIBERS; i++)
00091 {
00092 if (m_subscriberAddress[i] == subscriberAddress)
00093 {
00094 index = i;
00095 break;
00096 }
00097 }
00098 return index;
00099 }
00100
00103 void SendRequestStatus(SubscriberAddress subscriberAddress, AckType ackType,
00104 OperationStatus registrationStatus);
00105
00108 void SendStatusChange(SubscriberAddress subscriberAddress, int unitId, int unitStatus);
00109
00110 public:
00111
00118
00119 void PublishStatus(int unitId, int unitStatus)
00120 {
00121 for (int i=0; i < MAX_SUBSCRIBERS; i++)
00122 {
00123
00124 if (m_subscriberAddress[i]!= FREE_ENTRY)
00125 {
00126 SendStatusChange(m_subscriberAddress[i], unitId, unitStatus);
00127 }
00128 }
00129 }
00137
00138 void HandleRegisterRequest(const RegisterRequestMessage *pMsg)
00139 {
00140 OperationStatus status = FAILURE;
00141
00142
00143 int index = Find(pMsg->subscriberAddress);
00144
00145 if (index == NOT_FOUND)
00146 {
00147
00148
00149 index = Find(FREE_ENTRY);
00150 if (index != NOT_FOUND)
00151 {
00152
00153 m_subscriberAddress[index] = pMsg->subscriberAddress;
00154 status = SUCCESS;
00155 SendRequestStatus(pMsg->subscriberAddress, REGISTRATION_ACK , status);
00156 }
00157 }
00158 }
00159
00165 void HandleDeregisterRequest(const DeregisterRequestMessage *pMsg)
00166 {
00167 OperationStatus status = FAILURE;
00168
00169 int index = Find(pMsg->subscriberAddress);
00170 if (index != NOT_FOUND)
00171 {
00172
00173 m_subscriberAddress[index] = FREE_ENTRY;
00174 status = SUCCESS;
00175 }
00176
00177
00178 SendRequestStatus(pMsg->subscriberAddress, DEREGISTRATION_ACK,status);
00179 }
00180
00181 }; |
|