How to create a simple Messaging Application using RabbitMQ 2 and Maven

September 5th, 2010 by

Having read an interesting comparison by Lindenlabs evaluating modern message broker systems like ActiveMQ, ApacheQpid and amongst others – RabbitMQ – I wanted to take a quick look at the last one and built a small application producing and consuming some sample messages.

If you need some lecture on getting started with RabbitMQ or the key concepts of messaging I strongly recommend reading this list of introductional articles from the RabbitMQ homepage.

Prerequisites

  • JDK >= 5
  • Maven > 2

RabbitMQ Server

  • First we need the message broker server – if you’re using Debian or Ubuntu you may install the server using
    sudo apt-get install rabbitmq-server
  • Otherwise take a look at the detailed download and installation docs on the RabbitMQ website
  • Having completed the installation ensure that the server is running
    user@localhost:/tmp$ sudo /etc/init.d/rabbitmq-server status
    Status of all running nodes...
    Node 'rabbit@localhost' with Pid 4835: running
    done.
     
    user@localhost:/tmp$ sudo rabbitmqctl -n rabbit@localhost status
    Status of node rabbit@localhost ...
    [{running_applications,[{rabbit,"RabbitMQ","2.0.0"},
     {os_mon,"CPO  CXC 138 46","2.2.4"},
     {sasl,"SASL  CXC 138 11","2.1.8"},
     {mnesia,"MNESIA  CXC 138 12","4.4.12"},
     {stdlib,"ERTS  CXC 138 10","1.16.4"},
     {kernel,"ERTS  CXC 138 10","2.13.4"}]},
     {nodes,[{disc,[rabbit@localhost]}]},
     {running_nodes,[rabbit@localhost]}]
    ...done.
  • If you’re searching for logs – in *nix like systems they can be found in /var/log/rabbitmq
  • There is a standard user created in the installation process with name: guest and password:guest – you might want to change the password if you’re testing/running on a public or nonsecured system. Get help using man rabbitmqctl
  • If you’re too lazy to install the message queueing server there is a public accessible demonstration broker service – I haven’t tested this server but if you want to give it a try – read this article providing the access information needed

Creating the producer/publisher client

  • Create a new simple maven project
    mvn archetype:create // or
    mvn archetype:generate // or just use your IDE with Maven plugin installed
  • Add the dependencies for amqp-client to your pom.xml
    <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>2.0.0</version>
    </dependency>
  • My pom.xml looks like this
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.hascode.tutorial</groupId>
     <artifactId>rabbitmq-sample</artifactId>
     <version>0.0.1-SNAPSHOT</version>
     <name>hasCode.com - RabbitMQ Sample</name>
     <properties>
     <rabbitmq.amqpclient.version>2.0.0</rabbitmq.amqpclient.version>
     </properties>
     <dependencies>
     <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>${rabbitmq.amqpclient.version}</version>
     </dependency>
     </dependencies>
    </project>
  • Create an ugly for-tutorials-use-only configuration class that delivers the username, password and other information
    package com.hascode.tutorial.config;
     
    public class Configuration {
     public static final String USERNAME = "guest";
     
     public static final String PASSWORD = "guest";
     
     public static final String HOSTNAME = "localhost";
     
     public static final int PORT = 5672;
    }
  • Create the producer class named SampleProducer – we’re going to run this class in separate threads – implementing Runnable
    package com.hascode.tutorial.client;
     
    import java.io.IOException;
     
    import com.hascode.tutorial.config.Configuration;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    public class SampleProducer implements Runnable {
     private final String message;
     
     private static final String EXCHANGE_NAME = "test";
     
     private static final String ROUTING_KEY = "test";
     
     public SampleProducer(final String message) {
     this.message = message;
     }
     
     @Override
     public void run() {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setUsername(Configuration.USERNAME);
     factory.setPassword(Configuration.PASSWORD);
     factory.setHost(Configuration.HOSTNAME);
     factory.setPort(Configuration.PORT);
     Connection conn;
     try {
     conn = factory.newConnection();
     Channel channel = conn.createChannel();
     
     channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
     String queueName = channel.queueDeclare().getQueue();
     channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
     System.out.println("Producing message: " + message + " in thread: " + Thread.currentThread().getName());
     channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
     
     channel.close();
     conn.close();
     } catch (IOException e) {
     e.printStackTrace();
     }
     }
    }

Creating the consumer/subscriber client

  • Create a new class named SampleConsumer
    package com.hascode.tutorial.client;
     
    import java.io.IOException;
     
    import com.hascode.tutorial.config.Configuration;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class SampleConsumer implements Runnable {
     private static final String EXCHANGE_NAME = "test";
     
     private static final String ROUTING_KEY = "test";
     
     private static final boolean NO_ACK = false;
     
     public void run() {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setUsername(Configuration.USERNAME);
     factory.setPassword(Configuration.PASSWORD);
     factory.setHost(Configuration.HOSTNAME);
     factory.setPort(Configuration.PORT);
     Connection conn;
     try {
     conn = factory.newConnection();
     Channel channel = conn.createChannel();
     channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
     String queueName = channel.queueDeclare().getQueue();
     channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
     QueueingConsumer consumer = new QueueingConsumer(channel);
     channel.basicConsume(queueName, NO_ACK, consumer);
     while (true) { // you might want to implement some loop-finishing
     // logic here ;)
     QueueingConsumer.Delivery delivery;
     try {
     delivery = consumer.nextDelivery();
     System.out.println("received message: " + new String(delivery.getBody()) + " in thread: " + Thread.currentThread().getName());
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     } catch (InterruptedException ie) {
     continue;
     }
     }
     } catch (IOException e) {
     e.printStackTrace();
     }
     }
    }

Put it all together

  • Create the Main class that loads the consumer and produces some sample messages using the SampleProducer
    package com.hascode.tutorial.client;
     
    public class Main {
     public static void main(String[] args) {
     // producing some messages
     for (int i = 1; i < 6; i++) {
     final String message = "This is message numero " + i;
     SampleProducer producer = new SampleProducer(message);
     new Thread(producer).start();
     }
     
     Thread consumerThread = new Thread(new SampleConsumer());
     consumerThread.start();
     }
    }

Running

  • Ensure your RabbitMQ server is running and run the Main class .. you should receive some output like this
    Producing message: This is message numero 4 in thread: Thread-3
    Producing message: This is message numero 1 in thread: Thread-0
    Producing message: This is message numero 3 in thread: Thread-2
    Producing message: This is message numero 5 in thread: Thread-4
    Producing message: This is message numero 2 in thread: Thread-1
    received message: This is message numero 4 in thread: Thread-5
    received message: This is message numero 1 in thread: Thread-5
    received message: This is message numero 3 in thread: Thread-5
    received message: This is message numero 5 in thread: Thread-5
    received message: This is message numero 2 in thread: Thread-5

Troubleshooting

  • AMQP protocol version mismatch; we are version 0-9-1, server is 0-8” > – Ensure to have RabbitMQ version 2.0 installed. If you’re using Ubuntu or Debian you might have installed version 1.7.x and need to add the RabbitMQ site to your sources as described in this RabbitMQ installation docs
  • I don’t receive all messages sent” – if you’re testing around be sure to reset the queues on your messaging server or change the exchange name/routing key

Resources

Tags: , , , , , ,

3 Responses to “How to create a simple Messaging Application using RabbitMQ 2 and Maven”

  1. Gabriel Huerta Araujo Says:

    What sort of logic is required for loop-finishing? I wanted to use the same exercise to do some kind of syncronization for waiting for all messages sent. is it possible this?

  2. micha kops Says:

    If you really want to create a temporal listening client you could put the information in a message and process this information in the client part to exit the loop .. e.g. “i am message 2 of total 56″. a bad approach would be sth like “i am the last message” due to the messaging asynchronism

  3. Anand Says:

    I am trying to achieve 15000 msg/sec. in RabbitMQ in java. I have tried a lot but haven’t got that much success. Can you please provide me the source code for this implementation of 15000 msg/sec?

    Thank you so much!!!

Leave a Reply

Please leave these two fields as-is:

Protected by Invisible Defender. Showed 403 to 81,008 bad guys.

Search
Categories