Setting up Kafka Brokers for Testing with Kafka-Unit

March 28th, 2018 by

When writing test for applications that interact with Kafka brokers we often need to setup a decent environment including an instance of Kafka and ZooKeeper.

Though Kafka ships with some classes for testing, setting up a simple testing environment is quite easier with the kafka-unit library that offers JUnit test rule support or a fast programmatic setup within no time.

In the following short example, I’d like to show how create a simple setup using Maven, Surefire and kafka-unit.

Running JUnit tests with kafka-unit.

Running JUnit tests with kafka-unit.

 

Maven Setup

Using Maven, we need to add the following two dependencies to our project’s pom.xml:

  • kafka-unit: kafka-unit for initializing Kafka and ZooKeeper
  • junit: our testing framework of choice here
<dependencies>
  <dependency>
    <groupId>info.batey.kafka</groupId>
    <artifactId>kafka-unit</artifactId>
    <version>1.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

That’s all we need to begin writing tests..

Programmatic Test Setup

In the first example, we’re setting up our Kafka environment programmatically with the help of the KafkaUnit utility class. What we’re doing now is:

  • we’re starting a Kafka broker running on a random port (we might also have specified a port)
  • we’re creating a new Kafka topic named “MyTestTopic
  • we’re sending a message to the topic
  • we’re fetching all messages from the topic
  • we’re verifying that only one message exists and that it matches our published message
  • we’re stopping the broker
package it;
 
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
 
import info.batey.kafka.unit.KafkaUnit;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
 
public class KafkaTest {
 
  KafkaUnit kafkaUnitServer;
 
  @Before
  public void setup() throws Exception {
    kafkaUnitServer = new KafkaUnit();
    kafkaUnitServer.startup();
  }
 
  @After
  public void tearDown() {
    kafkaUnitServer.shutdown();
  }
 
  @Test
  public void readMessagesFromTopic() throws Exception {
    final String topicName = "MyTestTopic";
    kafkaUnitServer.createTopic(topicName);
 
    ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(topicName, "greeting",
        "Hello world from hascode.com :)");
    kafkaUnitServer.sendMessages(keyedMessage);
 
    List<String> allMessages = kafkaUnitServer.readAllMessages(topicName);
    assertThat("topic should contain only one message", allMessages.size(), equalTo(1));
    assertThat("the message should match the published message", allMessages.get(0), equalTo("Hello world from hascode.com :)"));
  }
}

We may now run our tests e.g. using Maven and the command-line like this:

$ mvn test -Dtest=KafkaTest                                                                                                                                                                                                                                                  ✭
[..]
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.KafkaTest
Created topic "MyTestTopic".
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.731 sec - in it.KafkaTest
 
Results :
 
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Running in an IDE like IntelliJ or EclipseIDE should produce a similar result like this:

Using kafka-unit programmatical

Using kafka-unit programmatical

Setup using Test Rules

In the second example we’re choosing the more comfortable approach using JUnit rules. The KafkaJUnitRule used as a @Rule handles starting and shutting down the broker for us. Again we’re following these steps:

  • we’re creating a new Kafka topic named “MyTestTopic
  • we’re sending a message to the topic
  • we’re fetching all messages from the topic
  • we’re verifying that only one message exists and that it matches our published message
package it;
 
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
 
import info.batey.kafka.unit.KafkaUnit;
import info.batey.kafka.unit.KafkaUnitRule;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Rule;
import org.junit.Test;
 
public class KafkaWithRulesTest {
 
  @Rule
  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
 
  @Test
  public void readMessagesFromTopic() throws Exception {
    final String topicName = "MyTestTopic";
    KafkaUnit kafkaUnitServer = kafkaUnitRule.getKafkaUnit();
    kafkaUnitServer.createTopic(topicName);
 
    ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(topicName, "greeting",
        "Hello world from hascode.com :)");
    kafkaUnitServer.sendMessages(keyedMessage);
 
    List<String> allMessages = kafkaUnitServer.readAllMessages(topicName);
    assertThat("topic should contain only one message", allMessages.size(), equalTo(1));
    assertThat("the message should match the published message", allMessages.get(0), equalTo("Hello world from hascode.com :)"));
  }
}

Running the test on the command-line could look similar to this result:

$ mvn test -Dtest=KafkaWithRulesTest                                                                                                                                                                                                                                         ✭
[..]
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.KafkaWithRulesTest
Created topic "MyTestTopic".
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.783 sec - in it.KafkaWithRulesTest
 
Results :
 
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

Running in an IDE like IntelliJ or EclipseIDE should produce a similar result like this:

Using kafka-unit with JUnit rules

Using kafka-unit with JUnit rules

Tutorial Sources

Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Git:

git clone https://bitbucket.org/hascode/kafka-testing-tutorial.git

Resources

Appendix A: Kafka and Docker

The Docker image spotify/kafka allows to setup Kafka in no time.

Running Kafka and ZooKeeper

The following command starts both ..

docker run -p 2181:2181 -p 9092:9092 --rm --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

Creating a Topic

The following command creates a new topic named “stream-plaintext-input“:

docker exec -ti ID bash
$(find -name 'kafka-topics.sh') --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stream-plaintext-input

Other Testing Articles of mine

Please feel free to have a look at other testing tutorial of mine (an excerpt):

And more…

Article Updates

  • 2018-05-02: Snippet for starting Kafka with Docker added.
  • 2018-03-28: Unused imports removed.

Tags: , , , , , , , , , , ,

One Response to “Setting up Kafka Brokers for Testing with Kafka-Unit”

  1. Anonymous Says:

    java.lang.NoClassDefFoundError: scala/collection/TraversableOnce$class

Search
Categories