How to create a simple Messaging Application using RabbitMQ 2 and Maven
September 5th, 2010 by Micha KopsHaving read an interesting comparison by Lindenlabs evaluating modern message broker systems like ActiveMQ, ApacheQpid and amongst others – RabbitMQ – I wanted to take a quick look at the last one and built a small application producing and consuming some sample messages.
If you need some lecture on getting started with RabbitMQ or the key concepts of messaging I strongly recommend reading this list of introductional articles from the RabbitMQ homepage.
Contents
Prerequisites
- JDK >= 5
- Maven > 2
RabbitMQ Server
- First we need the message broker server – if you’re using Debian or Ubuntu you may install the server using
sudo apt-get install rabbitmq-server
- Otherwise take a look at the detailed download and installation docs on the RabbitMQ website
- Having completed the installation ensure that the server is running
user@localhost:/tmp$ sudo /etc/init.d/rabbitmq-server status Status of all running nodes... Node 'rabbit@localhost' with Pid 4835: running done. user@localhost:/tmp$ sudo rabbitmqctl -n rabbit@localhost status Status of node rabbit@localhost ... [{running_applications,[{rabbit,"RabbitMQ","2.0.0"}, {os_mon,"CPO CXC 138 46","2.2.4"}, {sasl,"SASL CXC 138 11","2.1.8"}, {mnesia,"MNESIA CXC 138 12","4.4.12"}, {stdlib,"ERTS CXC 138 10","1.16.4"}, {kernel,"ERTS CXC 138 10","2.13.4"}]}, {nodes,[{disc,[rabbit@localhost]}]}, {running_nodes,[rabbit@localhost]}] ...done.
- If you’re searching for logs – in *nix like systems they can be found in /var/log/rabbitmq
- There is a standard user created in the installation process with name: guest and password:guest – you might want to change the password if you’re testing/running on a public or nonsecured system. Get help using man rabbitmqctl
- If you’re too lazy to install the message queueing server there is a public accessible demonstration broker service – I haven’t tested this server but if you want to give it a try – read this article providing the access information needed
Creating the Producer/Publisher Client
- Create a new simple maven project
mvn archetype:create // or mvn archetype:generate // or just use your IDE with Maven plugin installed
- Add the dependencies for amqp-client to your pom.xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.0.0</version> </dependency>
- My pom.xml looks like this
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hascode.tutorial</groupId> <artifactId>rabbitmq-sample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hasCode.com - RabbitMQ Sample</name> <properties> <rabbitmq.amqpclient.version>2.0.0</rabbitmq.amqpclient.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.amqpclient.version}</version> </dependency> </dependencies> </project>
- Create an ugly for-tutorials-use-only configuration class that delivers the username, password and other information
package com.hascode.tutorial.config; public class Configuration { public static final String USERNAME = "guest"; public static final String PASSWORD = "guest"; public static final String HOSTNAME = "localhost"; public static final int PORT = 5672; }
- Create the producer class named SampleProducer – we’re going to run this class in separate threads – implementing Runnable
package com.hascode.tutorial.client; import java.io.IOException; import com.hascode.tutorial.config.Configuration; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SampleProducer implements Runnable { private final String message; private static final String EXCHANGE_NAME = "test"; private static final String ROUTING_KEY = "test"; public SampleProducer(final String message) { this.message = message; } @Override public void run() { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(Configuration.USERNAME); factory.setPassword(Configuration.PASSWORD); factory.setHost(Configuration.HOSTNAME); factory.setPort(Configuration.PORT); Connection conn; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); System.out.println("Producing message: " + message + " in thread: " + Thread.currentThread().getName()); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); channel.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } } }
Creating the Consumer/Subscriber Client
- Create a new class named SampleConsumer
package com.hascode.tutorial.client; import java.io.IOException; import com.hascode.tutorial.config.Configuration; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class SampleConsumer implements Runnable { private static final String EXCHANGE_NAME = "test"; private static final String ROUTING_KEY = "test"; private static final boolean NO_ACK = false; public void run() { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(Configuration.USERNAME); factory.setPassword(Configuration.PASSWORD); factory.setHost(Configuration.HOSTNAME); factory.setPort(Configuration.PORT); Connection conn; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, NO_ACK, consumer); while (true) { // you might want to implement some loop-finishing // logic here ;) QueueingConsumer.Delivery delivery; try { delivery = consumer.nextDelivery(); System.out.println("received message: " + new String(delivery.getBody()) + " in thread: " + Thread.currentThread().getName()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (InterruptedException ie) { continue; } } } catch (IOException e) { e.printStackTrace(); } } }
Putting everything together
- Create the Main class that loads the consumer and produces some sample messages using the SampleProducer
package com.hascode.tutorial.client; public class Main { public static void main(String[] args) { // producing some messages for (int i = 1; i < 6; i++) { final String message = "This is message numero " + i; SampleProducer producer = new SampleProducer(message); new Thread(producer).start(); } Thread consumerThread = new Thread(new SampleConsumer()); consumerThread.start(); } }
Running the Application
- Ensure your RabbitMQ server is running and run the Main class .. you should receive some output like this
Producing message: This is message numero 4 in thread: Thread-3 Producing message: This is message numero 1 in thread: Thread-0 Producing message: This is message numero 3 in thread: Thread-2 Producing message: This is message numero 5 in thread: Thread-4 Producing message: This is message numero 2 in thread: Thread-1 received message: This is message numero 4 in thread: Thread-5 received message: This is message numero 1 in thread: Thread-5 received message: This is message numero 3 in thread: Thread-5 received message: This is message numero 5 in thread: Thread-5 received message: This is message numero 2 in thread: Thread-5
Troubleshooting
- “AMQP protocol version mismatch; we are version 0-9-1, server is 0-8” > – Ensure to have RabbitMQ version 2.0 installed. If you’re using Ubuntu or Debian you might have installed version 1.7.x and need to add the RabbitMQ site to your sources as described in this RabbitMQ installation docs
- “I don’t receive all messages sent” – if you’re testing around be sure to reset the queues on your messaging server or change the exchange name/routing key
Resources
- RabbitMQ Website
- RabbitMQ Examples Site
- RabbitMQ: Message exchange concepts
- Administration Guide for RabbitMQ Server
- RabbitMQ API Guide
- Examples and information about the Java Client API
- Lindenlabs Evaluation of Message Broker Systems
- Getting started with RabbitMQ
- Advanced Message Queueing Protocol Website
- RabbitMQ Demonstration Broker Service
Article Updates
- 2015-03-03: Table of contents added.
Tags: amqp, consumer, jms, maven, messaging, rabbitmq, springsource
January 26th, 2011 at 12:44 am
What sort of logic is required for loop-finishing? I wanted to use the same exercise to do some kind of syncronization for waiting for all messages sent. is it possible this?
January 26th, 2011 at 7:18 am
If you really want to create a temporal listening client you could put the information in a message and process this information in the client part to exit the loop .. e.g. “i am message 2 of total 56″. a bad approach would be sth like “i am the last message” due to the messaging asynchronism
July 15th, 2011 at 12:57 pm
I am trying to achieve 15000 msg/sec. in RabbitMQ in java. I have tried a lot but haven’t got that much success. Can you please provide me the source code for this implementation of 15000 msg/sec?
Thank you so much!!!