Playing around with MQTT and Java with Moquette and Eclipse Paho

June 1st, 2016 by

The MQ Telemetry Transport Protocol (MQTT) is a lightweight publish/subscribe messaging protocol developed in 1999 that experiences a growing popularity due to trends like the Internet-of-Things and the need to exchange information between low powered devices with aspects as CPU and bandwidth usage in mind.

In the following tutorial I’d like to demonstrate how to set-up a broker for this protocol with the help of the Moquette library and how to create a client and publish messages for a specific topic using this broker and Eclipse Paho as client library.

MQTT Sample Application

MQTT Sample Application

 

Dependencies

We need to add two repositories and the two dependencies for Eclipse Paho and Moquette to our project’s pom.xml:

<repositories>
	<repository>
		<id>bintray</id>
		<url>http://dl.bintray.com/andsel/maven/</url>
		<releases>
			<enabled>true</enabled>
		</releases>
		<snapshots>
			<enabled>false</enabled>
		</snapshots>
	</repository>
	<repository>
		<id>Eclipse Paho Repo</id>
		<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
	</repository>
</repositories>
<dependencies>
	<dependency>
		<groupId>io.moquette</groupId>
		<artifactId>moquette-broker</artifactId>
		<version>0.8</version>
	</dependency>
	<dependency>
		<groupId>org.eclipse.paho</groupId>
		<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
		<version>1.0.2</version>
	</dependency>
</dependencies>

MQTT in Action

In our sample application we’re creating an MQTT broker first and we’re initializing and using an MQTT client to publish a message afterwards.

MQTT Broker using Moquette

We’re using Moquette as a lightweight, MQTT compliant broker supporting QoS 0-2 here.

The following configuration file named moquette.conf specifies port, host and security properties for the broker:

port 1883
host 0.0.0.0
allow_anonymous true

To intercept and print out incoming messages, we’re adding the following listener class, implementing io.moquette.interception.AbstractInterceptHandler.

class PublisherListener extends AbstractInterceptHandler {
	@Override
	public void onPublish(InterceptPublishMessage message) {
		System.out.println("moquette mqtt broker message intercepted, topic: " + message.getTopicName()
				+ ", content: " + new String(message.getPayload().array()));
	}
}

Now we just need to instantiate the configuration, start the broker, add the interceptor-handler and register a shutdown-hook:

final IConfig classPathConfig = new ClasspathConfig();
 
final Server mqttBroker = new Server();
final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublisherListener());
mqttBroker.startServer(classPathConfig, userHandlers);
 
System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(new Thread() {
	@Override
	public void run() {
		System.out.println("stopping moquette mqtt broker..");
		mqttBroker.stopServer();
		System.out.println("moquette mqtt broker stopped");
	}
});

MQTT Client using Eclipse Paho

Setting up a client and sending a message is done within a few steps – having created a client instance with specific connection-options, we’re publishing a message to out broker and for the topic named news:

String topic = "news";
String content = "Visit www.hascode.com! :D";
int qos = 2;
String broker = "tcp://0.0.0.0:1883";
String clientId = "paho-java-client";
 
try {
	MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
	MqttConnectOptions connOpts = new MqttConnectOptions();
	connOpts.setCleanSession(true);
	System.out.println("paho-client connecting to broker: " + broker);
	sampleClient.connect(connOpts);
	System.out.println("paho-client connected to broker");
	System.out.println("paho-client publishing message: " + content);
	MqttMessage message = new MqttMessage(content.getBytes());
	message.setQos(qos);
	sampleClient.publish(topic, message);
	System.out.println("paho-client message published");
	sampleClient.disconnect();
	System.out.println("paho-client disconnected");
} catch (MqttException me) {
	me.printStackTrace();
}

Final Application

Putting everything together, this is our simple example application running broker and client when executed:

package com.hascode.tutorial.mqtt;
 
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
 
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.server.Server;
import io.moquette.server.config.ClasspathConfig;
import io.moquette.server.config.IConfig;
 
public class Main {
 
	static class PublisherListener extends AbstractInterceptHandler {
		@Override
		public void onPublish(InterceptPublishMessage message) {
			System.out.println("moquette mqtt broker message intercepted, topic: " + message.getTopicName()
					+ ", content: " + new String(message.getPayload().array()));
		}
	}
 
	public static void main(String[] args) throws InterruptedException, IOException {
		// Creating a MQTT Broker using Moquette
		final IConfig classPathConfig = new ClasspathConfig();
 
		final Server mqttBroker = new Server();
		final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublisherListener());
		mqttBroker.startServer(classPathConfig, userHandlers);
 
		System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				System.out.println("stopping moquette mqtt broker..");
				mqttBroker.stopServer();
				System.out.println("moquette mqtt broker stopped");
			}
		});
 
		Thread.sleep(4000);
 
		// Creating a MQTT Client using Eclipse Paho
		String topic = "news";
		String content = "Visit www.hascode.com! :D";
		int qos = 2;
		String broker = "tcp://0.0.0.0:1883";
		String clientId = "paho-java-client";
 
		try {
			MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
			MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			System.out.println("paho-client connecting to broker: " + broker);
			sampleClient.connect(connOpts);
			System.out.println("paho-client connected to broker");
			System.out.println("paho-client publishing message: " + content);
			MqttMessage message = new MqttMessage(content.getBytes());
			message.setQos(qos);
			sampleClient.publish(topic, message);
			System.out.println("paho-client message published");
			sampleClient.disconnect();
			System.out.println("paho-client disconnected");
		} catch (MqttException me) {
			me.printStackTrace();
		}
	}
}

Running the Sample Application

We’re now ready to run our application using our IDE of choice or using command line and Maven:

$ mvn exec:java -Dexec.mainClass=com.hascode.tutorial.mqtt.Main
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building mqtt-java-tutorial 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.3.1:java (default-cli) @ mqtt-java-tutorial ---
moquette mqtt broker started, press ctrl-c to shutdown..
paho-client connecting to broker: tcp://0.0.0.0:1883
paho-client connected to broker
paho-client publishing message: Visit www.hascode.com! :D
moquette mqtt broker message intercepted, topic: news, content: Visit www.hascode.com! :D
paho-client message published
paho-client disconnected
^Cstopping moquette mqtt broker..
moquette mqtt broker stopped

Tutorial Sources

Please feel free to download the tutorial sources fromĀ my Bitbucket repository, fork it there or clone it using Git:

git clone https://bitbucket.org/hascode/mqtt-java-tutorial.git

Resources

Search
Tags
Categories