Share

Publish-Subscriber Design Patterns in Embedded Systems

While developing embedded system, one frequently encounters a situation where many entities are interested in occurrence of a particular event. This introduces a strong coupling between the publisher and subscriber of this event change notification. Thus whenever a new entity needs the information, code for the publisher of the information also needs to be modified to accommodate the new request.

An Example

Consider the example of a system which manages terminals. Here many entities would be interested in knowing when a terminal's status changes. A few examples are:

  • Call Processing module needs to know when the terminal goes out of service so that calls on the terminal can be cleared.
  • Alarm processing software needs to know of terminal status change, so that alarms can be raised/cleared.
  • Resource manager would be interested in terminal status change as it needs to change the resource counts when terminal status changes.

As the design progresses, more and more applications would be interested in status change. Its easy to see that this complicates the design, as terminal status handling has to be modified on each occasion.

Publish-Subscribe Pattern

The Publish-Subscribe Pattern solves the tight coupling problem. Here the coupling is removed by the publisher of information supporting a generic interface for subscribers. Entities interested in the information subscribe to the publisher by registering for the information. With this interface, the publisher code does not need to be modified every time a subscriber is introduced.

Whenever information needs to be published, the publisher invokes the Publish method to inform all the subscribers.

This pattern comes in two flavors:

Local Publish-Subscribe Pattern

Here we develop the LocalStatusPublisher class. This class can be used as a helper class when this generic subscribe-publish interface is desired. The following important public methods are supported:

  • PublishStatus: Invoke this method to inform all registered subscribers about status change.
  • Register: Invoke this method to register an object for the status change notification.
  • Deregister: Invoke this method to deregister an object. The object will not receive any notifications of status change after it deregisters.

Local Status Publisher

#include "Subscriber.h"
#include <stdio.h>

class LocalStatusPublisher
{
    enum {
          NOT_FOUND = -1, 

          MAX_SUBSCRIBERS = 100
         };

    Subscriber *m_pSubscriber[MAX_SUBSCRIBERS];
    

    int Find(Subscriber *pSubscriber) const
    {
       int index = NOT_FOUND;
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           if (m_pSubscriber[i] == pSubscriber)
           {
               index = i;
               break;
           }
       }       
       return index;     
    }
            
public:

    enum OperationStatus {SUCCESS, FAILURE};


    void PublishStatus(int unitId, int status)
    {
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           // A valid subscriber exists only when the pointer is non NULL
           if (m_pSubscriber[i])
           {
               m_pSubscriber[i]->HandleStatusChange(unitId, status);
           }
       }
    }     

    OperationStatus Register(Subscriber *pSubscriber)
    {
        OperationStatus status = FAILURE;
        // First check if the subscriber is already present
        // in the list
        int index = Find(pSubscriber);
                
        if (index == NOT_FOUND)
        {
            // Subscriber was not found, so this is not a duplicate request
            // Now look for a free entry by finding NULL
            index = Find(NULL);
            if (index != NOT_FOUND)
            {
               // A free entry has been found
               m_pSubscriber[index] = pSubscriber;
               status = SUCCESS;
            }
        }      
        return status;
    }
    
    OperationStatus Deregister(Subscriber *pSubscriber)
    {
        OperationStatus status = FAILURE;
        // Search for the entry
        int index = Find(pSubscriber);
        if (index != NOT_FOUND)
        {
           // Free the entry by marking as NULL
           m_pSubscriber[index] = NULL;
           status = SUCCESS;
        }
        return SUCCESS;
    }
    
};   

Remote Publish-Subscribe Pattern

Here we will look at the RemoteStatusPublisher class. This class supports a message based interface. Subscribers send registration request message to register for the status change. The source address of the sender is saved as a part of the registration process. Deregistration request is sent to stop receiving the status change notifications.

Whenever status change is detected, PublishStatus method is invoked. This method sends the status change message to all the registered subscribers using the address that was obtained during registration.

Remote Status Publisher

#include "Subscriber.h"
#include <stdio.h>

typedef unsigned long SubscriberAddress;

struct RegisterRequestMessage
{
    int opcode;

    SubscriberAddress subscriberAddress;
};

struct DeregisterRequestMessage
{
    int opcode;

    SubscriberAddress subscriberAddress;
};

enum OperationStatus {SUCCESS, FAILURE};

enum AckType {REGISTRATION_ACK, DEREGISTRATION_ACK};

class RemoteStatusPublisher
{
    enum {

        FREE_ENTRY = 0,

        NOT_FOUND = -1, 

        MAX_SUBSCRIBERS = 100
    };

    SubscriberAddress m_subscriberAddress[MAX_SUBSCRIBERS];    

    int Find(SubscriberAddress subscriberAddress) const
    {
       int index = NOT_FOUND;
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           if (m_subscriberAddress[i] == subscriberAddress)
           {
               index = i;
               break;
           }
       }       
       return index;     
    } 
     
    void SendRequestStatus(SubscriberAddress subscriberAddress, AckType ackType, 
                        OperationStatus registrationStatus);

    void SendStatusChange(SubscriberAddress subscriberAddress, int unitId, int unitStatus);

public:


    void PublishStatus(int unitId, int unitStatus)
    {
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           // A valid subscriber exists only when the pointer is non NULL
           if (m_subscriberAddress[i]!= FREE_ENTRY)
           {
               SendStatusChange(m_subscriberAddress[i], unitId, unitStatus);
           }
       }
    }    

    void HandleRegisterRequest(const RegisterRequestMessage *pMsg)
    {
        OperationStatus status = FAILURE;
        // First check if the subscriber is already present
        // in the list
        int index = Find(pMsg->subscriberAddress);
                
        if (index == NOT_FOUND)
        {
            // Subscriber was not found, so this is not a duplicate request
            // Now look for a free entry by finding FREE_ENTRY
            index = Find(FREE_ENTRY);
            if (index != NOT_FOUND)
            {
               // A free entry has been found
               m_subscriberAddress[index] = pMsg->subscriberAddress;
               status = SUCCESS;
               SendRequestStatus(pMsg->subscriberAddress, REGISTRATION_ACK , status);
            }
        }      
    }
    
  void HandleDeregisterRequest(const DeregisterRequestMessage *pMsg)
    {
        OperationStatus status = FAILURE;
        // Search for the entry
        int index = Find(pMsg->subscriberAddress);
        if (index != NOT_FOUND)
        {
           // Free the entry by marking as FREE_ENTRY
           m_subscriberAddress[index] = FREE_ENTRY;
           status = SUCCESS;
        }

        // Inform the requester with an acknowledgement.
        SendRequestStatus(pMsg->subscriberAddress, DEREGISTRATION_ACK,status);
    }
    
};