The MQ Telemetry Transport Protocol (MQTT) is a lightweight publish/subscribe messaging protocol developed in 1999 that experiences a growing popularity due to trends like the Internet-of-Things and the need to exchange information between low powered devices with aspects as CPU and bandwidth usage in mind.
In the following tutorial I’d like to demonstrate how to set-up a broker for this protocol with the help of the Moquette library and how to create a client and publish messages for a specific topic using this broker and Eclipse Paho as client library.
Dependencies
We need to add two repositories and the two dependencies for Eclipse Paho and Moquette to our project’s pom.xml:
<repositories>
<repository>
<id>bintray</id>
<url>http://dl.bintray.com/andsel/maven/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
MQTT in Action
In our sample application we’re creating an MQTT broker first and we’re initializing and using an MQTT client to publish a message afterwards.
MQTT Broker using Moquette
We’re using Moquette as a lightweight, MQTT compliant broker supporting QoS 0-2 here.
The following configuration file named moquette.conf specifies port, host and security properties for the broker:
port 1883
host 0.0.0.0
allow_anonymous true
To intercept and print out incoming messages, we’re adding the following listener class, implementing io.moquette.interception.AbstractInterceptHandler.
class PublisherListener extends AbstractInterceptHandler {
@Override
public void onPublish(InterceptPublishMessage message){
System.out.println("moquette mqtt broker message intercepted, topic: "+ message.getTopicName()+", content: "+newString(message.getPayload().array()));
}
}
Now we just need to instantiate the configuration, start the broker, add the interceptor-handler and register a shutdown-hook:
final IConfig classPathConfig = new ClasspathConfig();
final Server mqttBroker =new Server();
final List<?extends InterceptHandler> userHandlers = Arrays.asList(new PublisherListener());
mqttBroker.startServer(classPathConfig, userHandlers);
System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(newThread(){
@Override
public void run(){
System.out.println("stopping moquette mqtt broker..");
mqttBroker.stopServer();
System.out.println("moquette mqtt broker stopped");
}
});
MQTT Client using Eclipse Paho
Setting up a client and sending a message is done within a few steps – having created a client instance with specific connection-options, we’re publishing a message to out broker and for the topic named news:
String topic ="news";
String content ="Visit www.hascode.com! :D";
int qos =2;
String broker ="tcp://0.0.0.0:1883";
String clientId ="paho-java-client";
try{
MqttClient sampleClient =new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts =new MqttConnectOptions();
connOpts.setCleanSession(true);System.out.println("paho-client connecting to broker: "+ broker);
sampleClient.connect(connOpts);System.out.println("paho-client connected to broker");System.out.println("paho-client publishing message: "+ content);
MqttMessage message =new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("paho-client message published");
sampleClient.disconnect();
System.out.println("paho-client disconnected");
} catch(MqttException me){
me.printStackTrace();
}
Final Application
Putting everything together, this is our simple example application running broker and client when executed:
package com.hascode.tutorial.mqtt;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.server.Server;
import io.moquette.server.config.ClasspathConfig;
import io.moquette.server.config.IConfig;
public class Main {
static class PublisherListener extends AbstractInterceptHandler {
@Override
public void onPublish(InterceptPublishMessage message) {
System.out.println("moquette mqtt broker message intercepted, topic: " + message.getTopicName()
+ ", content: " + new String(message.getPayload().array()));
}
}
public static void main(String[] args) throws InterruptedException, IOException {
// Creating a MQTT Broker using Moquette
final IConfig classPathConfig = new ClasspathConfig();
final Server mqttBroker = new Server();
final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublisherListener());
mqttBroker.startServer(classPathConfig, userHandlers);
System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("stopping moquette mqtt broker..");
mqttBroker.stopServer();
System.out.println("moquette mqtt broker stopped");
}
});
Thread.sleep(4000);
// Creating a MQTT Client using Eclipse Paho
String topic = "news";
String content = "Visit www.hascode.com! :D";
int qos = 2;
String broker = "tcp://0.0.0.0:1883";
String clientId = "paho-java-client";
try {
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("paho-client connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("paho-client connected to broker");
System.out.println("paho-client publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("paho-client message published");
sampleClient.disconnect();
System.out.println("paho-client disconnected");
} catch (MqttException me) {
me.printStackTrace();
}
}
}
Running the Sample Application
We’re now ready to run our application using our IDE of choice or using command line and Maven:
$ mvn compile exec:java -Dexec.mainClass=com.hascode.tutorial.mqtt.Main
[INFO] Scanning for projects...
[INFO][INFO]------------------------------------------------------------------------[INFO] Building mqtt-java-tutorial 1.0.0
[INFO]------------------------------------------------------------------------[INFO][INFO]--- exec-maven-plugin:1.3.1:java (default-cli)@ mqtt-java-tutorial ---
moquette mqtt broker started, press ctrl-c to shutdown..
paho-client connecting to broker: tcp://0.0.0.0:1883
paho-client connected to broker
paho-client publishing message: Visit www.hascode.com! :D
moquette mqtt broker message intercepted, topic: news, content: Visit www.hascode.com! :D
paho-client message published
paho-client disconnected
^Cstopping moquette mqtt broker..
moquette mqtt broker stopped
Tutorial Sources
Please feel free to download the tutorial sources from my GitHub repository, fork it there or clone it using Git:
git clone https://github.com/hascode/mqtt-java-tutorial.git
Resources
Article Updates
-
2019-05-11: Maven compile step added to build example.