Update Manager Service Example

PART 4. ESF PROTOCOL/APPLICATION EXAMPLES
GPS & Cloud Publishing Example

 

Example

·            Overview

·            Prerequisites

·            Part 1 – Setup for Cloud Publishing

·            Part 2 – Implementing Cloud Publisher

·            Part 3 – Implementing Cloud Subscriber

·            Part 4 – Testing the GPS Publish/Subscribe Example

Example

Overview

This example shows how to use ESF to gather data from a local GPS receiver and to publish data to a Cloud broker using MQTT protocol.

 

MQTT is a lightweight publish/subscribe protocol that can be used to send data from multiple clients devices into a common message broker, allowing a variety of options in creating a data gathering system.  Clients can also subscribe to certain sets of data from the broker, so that all connected clients receive a copy of data immediately when a publication arrives at the broker and matches the subscription topic(s) to which they have subscribed.

 

In this example, the GPS data is used as typical data that might be published regularly by a client.  However, the GPS retrieval code shown here may also be used independently of the MQTT publishing.

 

In this example, you will learn how to

·            Create an application that gathers GPS data using ESF APIs

·            Publish and subscribe to data on a Cloud broker using the MQTT protocol

 

Prerequisites

·            Installing the Eclipse IDE

·            Integrating ESF Tooling and Projects into Eclipse

or

·            Installing ESF Tooling into Wind River Workbench

and

·            Using Working Sets

·            Hello World Using the ESF Logger

 

 

Part 1 – Setup for Cloud Publishing

 

In order to use ESF to publish to a Cloud broker, ESF needs to be configured so that your system can connect to the broker over the network.  You also need a broker that can accept MQTT publish/subscribe messages.  Eurotech provides a public broker that runs in the Everyware™ Cloud.  If you have not already set up an account for Cloud publishing, contact your regional Eurotech sales manager or email sales.us@eurotech.com to request an Everyware Cloud account.  When your account is created, you will receive your account credentials (Username, Password, and broker URL) in a registration email.  For details about accessing your Everyware Cloud, refer to the Everyware Cloud Developer’s Guide.

 

After setting up a Cloud account, you need to configure the MQTT Client in ESF.  If you are using Eurotech’s Denali, the Web configuration interface provides a Cloud Client tab for this configuration.  If you are using just ESF, you need to submit a configuration for the com.esf.net.mqtt.client component.  See Configuration Deployment for a review of the configuration update mechanism in ESF.  This example illustrates an MQTT Client configuration file, but it must be changed to reflect the specific account credentials of your broker login.  Note that some symbols used in property values, such as   ! # = \ and :   need to be preceded by a backslash  \  character in the mqtt client configuration file.

 

tcd_file_tgz_1_default-mqtt-config.tgz

 

 

Part 2 – Implementing Cloud Publisher

Begin by creating a new Plug-in Project in Eclipse.  For a review of the instructions for this step, see the Hello World example.  The basic information on the project is

 

·            Project (bundle) Name

1.    com.esf.example.cloud.publish

·            Imported Services (to be added to the Automated Management of Dependencies list in the Eclipse Manifest Editor)

1.    org.eclipse.osgi

2.    org.eclipse.soda.sat.core

3.    org.soda.stepstone.core

4.    com.esf.core.logger.service

5.    com.esf.net.mqtt.client.service

6.    client

7.    com.esf.device.gps.service

·            Activator Class Name (with Package Name)

1.    com.esf.example.cloud.publish.bundle.Activator

·            Implementation Class Name (with Package Name)

1.    com.esf.example.cloud.publish.CloudPublishExample

 

The following is the Activator code for this project.

 

package com.esf.example.cloud.publish.bundle;

 

import org.eclipse.soda.sat.core.framework.BaseBundleActivator;

 

import com.esf.core.logger.service.IEsfLoggerService;

import com.esf.device.gps.service.IGPSService;

import com.esf.example.cloud.publish.CloudPublishExample;

import com.esf.net.mqtt.client.service.IEsfMqttService;

 

public class Activator extends BaseBundleActivator {

 

      private CloudPublishExample cloudPublishExample = null;

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#activate()

       */

      protected void activate() {

 

            this.cloudPublishExample = new CloudPublishExample();

            this.cloudPublishExample.bind(this.getIEsfLoggerService(),

                        this.getIEsfMqttService(), this.getIGPSService());

      }

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#deactivate()

       */

      protected void deactivate() {

           

            this.cloudPublishExample.unbind();

            this.cloudPublishExample = null;

      }

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#getImportedServiceNames()

       */

      protected String[] getImportedServiceNames() {

            return new String[] {

                  IEsfLoggerService.SERVICE_NAME,

                  IEsfMqttService.SERVICE_NAME,

                  IGPSService.SERVICE_NAME

            };

      }

     

      /*

       * This method obtains ESF Logger Service.

       */

      private IEsfLoggerService getIEsfLoggerService() {

            return (IEsfLoggerService) getImportedService(IEsfLoggerService.SERVICE_NAME);

      }

 

      /*

       * This method obtains ESF MQtt Service.

       */

      private IEsfMqttService getIEsfMqttService() {

            return (IEsfMqttService) getImportedService(IEsfMqttService.SERVICE_NAME);

      }

 

      /*

       * This method obtains GPS Service.

       */

      private IGPSService getIGPSService() {

            return (IGPSService) getImportedService(IGPSService.SERVICE_NAME);

      }

}

 

Notice that this bundle imports three services:

1.    IEsfLoggerService – as required for logging

2.    IEsfMqttService – to obtain ESF MQtt Master Client service

3.    IGPSService – if we want to publish GPS data

 

Now we can look at the main implementation class, which is shown in the following code.

 

package com.esf.example.cloud.publish;

 

import java.util.Date;

 

import org.soda.stepstone.core.Delay;

import org.soda.stepstone.core.IWork;

import org.soda.stepstone.core.Worker;

 

import com.esf.core.logger.service.IEsfLoggerService;

import com.esf.device.gps.service.IGPSService;

import com.esf.net.mqtt.client.service.EsfMqttException;

import com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler;

import com.esf.net.mqtt.client.service.IEsfMqttMasterClient;

import com.esf.net.mqtt.client.service.IEsfMqttService;

import com.eurotech.cloud.message.EdcPayload;

 

/**

 * Defines Cloud Publish Example class

 *

 * @author Eurotech

 *

 */

public class CloudPublishExample implements IEsfMqttCallbackHandler {

     

      private static final String LABEL = CloudPublishExample.class.getName() + ": ";

 

      private static final String applicationID = "Cloud Publisher Demo";

      private static final String publishTopicSystime = "cloud/example/systime";

      private static final String publishTopicGps = "cloud/example/gps";

      private static final String payloadDelimeter = ",";

      private static final int qos = 2;

      private static final boolean brokerRetainsMessage = true;

     

      private IEsfMqttMasterClient mqttMasterClient = null;

      private Worker publisher = null;

 

      private IEsfLoggerService esfLoggerService = null;

      private IEsfMqttService esfMqttService = null;

      private IGPSService gpsService = null;

     

      /**

       * This method binds services and allocates resources.

       *

       * @param esfLoggerService - ESF logger service as <code>IEsfLoggerService</code>

       * @param esfMqttService - ESF MQtt service as <code>IEsfMqttService</code>

       * @param gpsService - GPS service as <code>IGPSService</code>

       */

      public void bind(IEsfLoggerService esfLoggerService,

                  IEsfMqttService esfMqttService, IGPSService gpsService) {

 

            this.esfLoggerService = esfLoggerService;

            this.esfMqttService = esfMqttService;

            this.gpsService = gpsService;

           

            this.publisher = new Worker(

                        "Publisher Thread", new IWork() {

                              public boolean doWork() throws InterruptedException {

                                    return publisherThread();

                              }

                        });

            this.publisher.start();

      }

     

      /**

       * This method releases resources.

       */

      public void unbind () {

           

            while (this.publisher.isRunning()) {

                  this.publisher.stop();

                  Delay.milliseconds(200);

            }

            this.publisher = null;

           

            if (this.mqttMasterClient != null) {

                  this.mqttMasterClient.release();

            }

           

            this.esfLoggerService = null;

            this.esfMqttService = null;

            this.gpsService = null;

      }

 

      private boolean publisherThread () {

           

            if (this.mqttMasterClient == null) {

                  try {

                        // Instantiate the ESF MQtt Client and register the callback implementation

                        this.esfLoggerService.logDebug(LABEL + "Getting MQtt Master Client ...");

                        this.mqttMasterClient = this.esfMqttService.getEsfMqttMasterClient(applicationID);

                        this.mqttMasterClient.registerCallback(this);

                  } catch (EsfMqttException e) {

                        this.esfLoggerService.logDump(LABEL + "Master client is not yet available ...");

                        Delay.seconds(2);

                        return true;

                  }

            }

           

            if (!this.mqttMasterClient.isConnected()) {

                 

                  this.esfLoggerService.logDebug(LABEL + "Waiting for MQtt Master Client to connect ...");

                  Delay.seconds(2);

                  return true;

            }

           

            StringBuffer systimePayload = new StringBuffer();

            systimePayload.append("SystemTime=");

            systimePayload.append(new Date().toString());

           

            boolean gpsLock = this.gpsService.getGpsLock();

            float latitude = this.gpsService.getLat();

            float longitude = this.gpsService.getLon();

            float speed = this.gpsService.getSpeedMPH();

           

            StringBuffer gpsPayload = new StringBuffer ();

            gpsPayload.append("GpsLock=");

            gpsPayload.append(gpsLock);

            gpsPayload.append(payloadDelimeter);

           

            gpsPayload.append("latitude=");

            gpsPayload.append(latitude);

            gpsPayload.append(payloadDelimeter);

           

            gpsPayload.append("longitude=");

            gpsPayload.append(longitude);

            gpsPayload.append(payloadDelimeter);

           

            gpsPayload.append("speedMPH=");

            gpsPayload.append(speed);

           

            try {

                 

                  int publishMsgID = this.mqttMasterClient.publish(publishTopicSystime, systimePayload

                              .toString().getBytes(), qos, brokerRetainsMessage);

 

                  this.esfLoggerService.logDebug(LABEL + "Published "

                              + systimePayload.toString() + " on topic " + publishTopicSystime

                              + " -- msgID=" + publishMsgID);

                 

                 

                  publishMsgID = this.mqttMasterClient.publish(publishTopicGps, gpsPayload

                              .toString().getBytes(), qos, brokerRetainsMessage);

 

                  this.esfLoggerService.logDebug(LABEL + "Published "

                              + gpsPayload.toString() + " on topic " + publishTopicGps

                              + " -- msgID=" + publishMsgID);

                 

            } catch (Exception e) {

                  e.printStackTrace();

            }

            Delay.seconds(5);

            return true;

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#

controlArrived(java.lang.String, java.lang.String, com.eurotech.cloud.message.EdcPayload, int, boolean)

       */

      public void controlArrived(String assetId, String topic, EdcPayload msg,

                  int qos, boolean retain) {

 

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "Received 'Control Arrived' notification from EDC Client -- topic="

                                    + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#

publishArrived(java.lang.String, java.lang.String, com.eurotech.cloud.message.EdcPayload, int, boolean)

       */

      public void publishArrived(String assetId, String topic, EdcPayload msg,

                  int qos, boolean retain) {

 

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "Received 'Publish Arrived' notification from EDC Client -- topic="

                                    + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#connectionLost()

       */

      public void connectionLost() {

           

            this.esfLoggerService.logDebug(LABEL

                        + "Received 'Connection Lost' notification from EDC Client");

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#connectionRestored()

       */

      public void connectionRestored() {

           

            this.esfLoggerService.logDebug(LABEL

                        + "Received 'Connection Restored' notification from EDC Client");

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#published(int)

       */

      public void published(int messageId) {

           

            this.esfLoggerService.logDebug(LABEL

                        + "publish has been fully acknowledged by the broker - msgID="

                        + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#subscribed(int)

       */

      public void subscribed(int messageId) {

     

            this.esfLoggerService.logDebug(LABEL

                        + "subscribe has been fully acknowledged by the broker - msgID="

                        + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#unsubscribed(int)

       */

      public void unsubscribed(int messageId) {

           

            this.esfLoggerService.logInfo(LABEL + "Unsubscribed (callback) -- msgID " + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler#

controlArrived(java.lang.String, java.lang.String, byte[], int, boolean)

       */

      public void controlArrived(String assetId, String topic, byte[] payload,

                  int qos, boolean retain) {

           

            this.esfLoggerService.logInfo(LABEL + "Control Arrived on topic=" + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler#

publishArrived(java.lang.String, java.lang.String, byte[], int, boolean)

       */

      public void publishArrived(String assetId, String topic, byte[] payload,

                  int qos, boolean retain) {

           

            esfLoggerService.logInfo(LABEL + "Publish Arrived on topic: " + topic);

            esfLoggerService.logInfo(LABEL + "                 payload: " + new String(payload));

      }

}

 

As usual, this class has bind() and unbind() methods.  In the bind() method, we set all three imported services, instantiate a new Worker, and launch the publisherThread that will do the job.

 

Note that we avoid using the IEsfMqttService to obtain the MQTT Master Client in the bind() method.  The reason is that this operation may result in the EsfMqttException, causing the bind() not to finish, and we want the bind method to finish cleanly.  We instead obtain the MQTT Master Client at the beginning of the publisherThread().   If the ESF MQTT Master Client can’t be obtained, the publisherThread() returns ‘true’, meaning (after a two second delay) the worker will reinvoke the publisherThread().  On the other hand, if the MQTT Master Client is obtained and Cloud connection is established, the publisherThread() obtains the system time and GPS information and publishes on two separate topics: cloud/example/systime and cloud/example/gps, respectively.  Finally, the publisherThread() is suspended for five seconds and is reinvoked by the worker.

 

The GPS service is obtained in the Activator and passed to the bind() routine of the CloudPublishExample.  Then in the publisherThread(), the getGpsLock(), getLat(), getLong(), and getSpeedMPH() methods of the IGPSService are called to obtain current GPS data and added to the gpsPayload for publishing.

 

 

Part 3 – Implementing Cloud Subscriber

 

In addition to publishing, ESF can also subscribe to the Cloud broker using MQTT.  In this example, we will simply show how the subscription is configured, but we will be using the subscriptions to receive our own published messages back from the broker for demonstration purposes only.

 

Begin by creating a new Plug-in Project in Eclipse.  The basic information on the project is

 

·            Project (bundle) Name

1.    com.esf.example.cloud.subscribe

·            Imported Services (to be added to the Automated Management of Dependencies list in the Eclipse Manifest Editor)

1.    org.eclipse.osgi

2.    org.eclipse.soda.sat.core

3.    org.soda.stepstone.core

4.    com.esf.core.logger.service

5.    com.esf.net.mqtt.client.service

6.    client

·            Activator Class Name (with Package Name)

1.    com.esf.example.cloud.subscribe.bundle.Activator

·            Implementation Class Name (with Package Name)

1.    com.esf.example.cloud.subscribe.CloudSubscribeExample

 

 

The following is the Activator code for this project.

 

package com.esf.example.cloud.subscribe.bundle;

 

import org.eclipse.soda.sat.core.framework.BaseBundleActivator;

import com.esf.core.logger.service.IEsfLoggerService;

import com.esf.example.cloud.subscribe.CloudSubscribeExample;

import com.esf.net.mqtt.client.service.IEsfMqttService;

 

public class Activator extends BaseBundleActivator {

 

      private CloudSubscribeExample cloudSubscribeExample = null;

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#activate()

       */

      protected void activate() {

 

            this.cloudSubscribeExample = new CloudSubscribeExample();

            this.cloudSubscribeExample.bind(this.getIEsfLoggerService(),

                        this.getIEsfMqttService());

      }

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#deactivate()

       */

      protected void deactivate() {

           

            this.cloudSubscribeExample.unbind();

            this.cloudSubscribeExample = null;

      }

     

      /*

       * (non-Javadoc)

       * @see org.eclipse.soda.sat.core.framework.BaseBundleActivator#getImportedServiceNames()

       */

      protected String[] getImportedServiceNames() {

            return new String[] {

                  IEsfLoggerService.SERVICE_NAME,

                  IEsfMqttService.SERVICE_NAME

            };

      }

     

      /*

       * This method obtains ESF Logger Service.

       */

      private IEsfLoggerService getIEsfLoggerService() {

            return (IEsfLoggerService) getImportedService(IEsfLoggerService.SERVICE_NAME);

      }

 

      /*

       * This method obtains ESF MQtt Service.

       */

      private IEsfMqttService getIEsfMqttService() {

            return (IEsfMqttService) getImportedService(IEsfMqttService.SERVICE_NAME);

      }

}

 

 

Note that this bundle imports two services:

1.    IEsfLoggerService – as required for logging

2.    IEsfMqttService – to obtain ESF MQtt Master Client service

 

Now we can look at the main implementation class, which is shown in the following code.

 

package com.esf.example.cloud.subscribe;

 

import org.soda.stepstone.core.Delay;

import org.soda.stepstone.core.IWork;

import org.soda.stepstone.core.Worker;

 

import com.esf.core.logger.service.IEsfLoggerService;

import com.esf.net.mqtt.client.service.EsfMqttException;

import com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler;

import com.esf.net.mqtt.client.service.IEsfMqttMasterClient;

import com.esf.net.mqtt.client.service.IEsfMqttService;

import com.eurotech.cloud.message.EdcPayload;

 

/**

 * Defines Cloud Subscribe Example class

 *

 * @author Eurotech

 *

 */

public class CloudSubscribeExample implements IEsfMqttCallbackHandler {

     

      private static final String LABEL = CloudSubscribeExample.class.getName() + ": ";

 

      private static final String applicationID = "Cloud Subscriber Demo";

      private static final String subscribeTopic = "cloud/example/#";

     

      private IEsfMqttMasterClient mqttMasterClient = null;

      private Worker subscriber = null;

      private static final int qos = 2;

     

      private IEsfLoggerService esfLoggerService = null;

      private IEsfMqttService esfMqttService = null;

     

      /**

       * This method binds services and allocates resources.

       *

       * @param esfLoggerService - ESF logger service as <code>IEsfLoggerService</code>

       * @param esfMqttService - ESF MQtt service as <code>IEsfMqttService</code>

       */

      public void bind(IEsfLoggerService esfLoggerService,

                  IEsfMqttService esfMqttService) {

           

            this.esfLoggerService = esfLoggerService;

            this.esfMqttService = esfMqttService;

           

            this.subscriber = new Worker(

                        "Publisher Thread", new IWork() {

                              public boolean doWork() throws InterruptedException {

                                    return subscriberThread();

                              }

                        });

            this.subscriber.start();

      }

     

      /**

       * This method releases resources.

       */

      public void unbind () {

           

            while (this.subscriber.isRunning()) {

                  this.subscriber.stop();

                  Delay.milliseconds(200);

            }

            this.subscriber = null;

           

            if (this.mqttMasterClient != null) {

                  this.mqttMasterClient.release();

            }

           

            this.esfLoggerService = null;

            this.esfMqttService = null;

      }

     

      private boolean subscriberThread () {

           

            if (this.mqttMasterClient == null) {

                  try {

                        // Instantiate the ESF MQtt Client and register the callback implementation

                        this.esfLoggerService.logDebug(LABEL + "Getting MQtt Master Client ...");

                        this.mqttMasterClient = this.esfMqttService.getEsfMqttMasterClient(applicationID);

                        this.mqttMasterClient.registerCallback(this);

                  } catch (EsfMqttException e) {

                        this.esfLoggerService.logDump(LABEL + "Master client is not yet available ...");

                        Delay.seconds(2);

                        return true;

                  }

            }

           

            if (!this.mqttMasterClient.isConnected()) {

                 

                  this.esfLoggerService.logDebug(LABEL + "Waiting for MQtt Master Client to connect ...");

                  Delay.seconds(2);

                  return true;

            }

           

            try {

                  int subscribeMsgID = this.mqttMasterClient.subscribe(subscribeTopic, qos);

                 

                  this.esfLoggerService.logDebug(LABEL + "Subscribed on topic "

                              + subscribeTopic + " -- msgID=" + subscribeMsgID);

            } catch (EsfMqttException e) {

                  e.printStackTrace();

                  Delay.seconds(5);

                  return true;

            }

 

            return false;

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#

controlArrived(java.lang.String, java.lang.String, com.eurotech.cloud.message.EdcPayload, int, boolean)

       */

      public void controlArrived(String assetId, String topic, EdcPayload msg,

                  int qos, boolean retain) {

 

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "Received 'Control Arrived' notification from EDC Client -- topic="

                                    + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#

publishArrived(java.lang.String, java.lang.String, com.eurotech.cloud.message.EdcPayload, int, boolean)

       */

      public void publishArrived(String assetId, String topic, EdcPayload msg,

                  int qos, boolean retain) {

 

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "Received 'Publish Arrived' notification from EDC Client -- topic="

                                    + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#connectionLost()

       */

      public void connectionLost() {

 

            this.esfLoggerService.logDebug(LABEL

                        + "Received 'Connection Lost' notification from EDC Client");

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#connectionRestored()

       */

      public void connectionRestored() {

 

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "Received 'Connection Restored' notification from EDC Client");

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#published(int)

       */

      public void published(int messageId) {

 

            this.esfLoggerService.logDebug(LABEL

                        + "publish has been fully acknowledged by the broker - msgID="

                        + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#subscribed(int)

       */

      public void subscribed(int messageId) {

           

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "subscribe has been fully acknowledged by the broker - msgID="

                                    + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.eurotech.cloud.client.EdcCallbackHandler#unsubscribed(int)

       */

      public void unsubscribed(int messageId) {

           

            this.esfLoggerService

                        .logDebug(LABEL

                                    + "unsubscribe has been fully acknowledged by the broker - msgID="

                                    + messageId);

      }

 

      /*

       * (non-Javadoc)

       * @see com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler#

controlArrived(java.lang.String, java.lang.String, byte[], int, boolean)

       */

      public void controlArrived(String assetId, String topic, byte[] payload,

                  int qos, boolean retain) {

           

            this.esfLoggerService.logInfo(LABEL + "Control Arrived on topic=" + topic);

      }

 

      /*

       * (non-Javadoc)

       * @see com.esf.net.mqtt.client.service.IEsfMqttCallbackHandler#

publishArrived(java.lang.String, java.lang.String, byte[], int, boolean)

       */

      public void publishArrived(String assetId, String topic, byte[] payload,

                  int qos, boolean retain) {

           

            this.esfLoggerService.logInfo(LABEL + "Publish Arrived on topic: " + topic);

            this.esfLoggerService.logInfo(LABEL + "                 payload: " + new String(payload));

      }

}

 

 

As can be seen from the above example, the CloudSubscribeExample class is implemented similarly to the CloudPublishExample.  The only important difference is how the subscribeThread() is implemented.  As with the publishThread() in the CloudPublishExample, the subscribeThread() gets hold of the ESF MQTT Master Client and verifies that Cloud connection is established.  But afterwards, it subscribes to the topic cloud/example/# and returns ‘false’.  Note that the “#” sign in the topic means that we are subscribing to all topics that start with the “cloud/example/”.  By returning ‘false’, the subscriberThread() notifies the worker that its job is done and there is no need to reinvoke. 

 

From this point on, every time a published message arrives at the broker on the cloud/example/systime or cloud/example/gps topic, the message is forwarded back to this device because of the subscription that we issued.  When the message is received at our device, the publishArrived() method will be invoked (as previously shown).  In this example code, the publishArrived() method merely displays the payload in a log entry, but it could be used instead to parse through and act upon the data contained in the subscribed message.

 

 

Part 4 – Testing the GPS Publish/Subscribe Example

Before exporting both plug-in projects, don’t forget to set the Activator and dependencies in the Manifest.  The runtime output from both CloudPublish and CloudSubscribe examples is shown in this section.  In this example, you can see the GPS and system time being published by CloudPublishExample, and then the same messages are received back from the broker on subscription by the CloudSubscribeExample.

 

 

[DEBUG] 2012-04-27 19:56:24.035 - com.esf.example.cloud.publish.CloudPublishExample: Published SystemTime=Fri Apr 27 19:56:24 GMT 2012 on topic cloud/example/systime -- msgID=7

[DEBUG] 2012-04-27 19:56:24.035 - com.esf.example.cloud.publish.CloudPublishExample: Published GpsLock=true,latitude=38.836155,longitude=-94.671486,speedMPH=0.2758887 on topic cloud/example/gps -- msgID=8

[DEBUG] 2012-04-27 19:56:24.157 - com.esf.example.cloud.publish.CloudPublishExample: publish has been fully acknowledged by the broker - msgID=7

[DEBUG] 2012-04-27 19:56:24.390 - com.esf.example.cloud.publish.CloudPublishExample: publish has been fully acknowledged by the broker - msgID=8

[INFO] 2012-04-27 19:56:24.549 - com.esf.example.cloud.subscribe.CloudSubscribeExample: Publish Arrived on topic: cloud/example/systime

[INFO] 2012-04-27 19:56:24.550 - com.esf.example.cloud.subscribe.CloudSubscribeExample:                  payload: SystemTime=Fri Apr 27 19:56:24 GMT 2012

[INFO] 2012-04-27 19:56:24.712 - com.esf.example.cloud.subscribe.CloudSubscribeExample: Publish Arrived on topic: cloud/example/gps

[INFO] 2012-04-27 19:56:24.712 - com.esf.example.cloud.subscribe.CloudSubscribeExample:                  payload: GpsLock=true,latitude=38.836155,longitude=-94.671486,speedMPH=0.2758887

 

 

The code to create this update can be found here.  Follow this procedure to import the projects into your workspace.