Goals
  1. Setup Kafka and Zookeeper with Docker and docker-compose

  2. Create a message consumer and producer in Java

Kafka Setup

We’re using docker-compose to set up our message broker, zookeper and other stuff using confluent-platform.

This is our docker-compose.yaml config file from Confluent’s following GitHub repository.

docker-compose.yaml
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.0.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.0.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.0.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.0.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Now we’re ready to start everything like this:

docker-compose up -d
Creating network "confluent-platform-docker_default" with the default driver
Creating zookeeper ... done
Creating broker    ... done
Creating schema-registry ... done
Creating rest-proxy      ... done
Creating connect         ... done
Creating ksqldb-server   ... done
Creating ksql-datagen    ... done
Creating ksqldb-cli      ... done

Use docker-compose ps to verify that everything is running fine:

docker-compose ps
Name                    Command                       State                                                                        Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up                      0.0.0.0:29092->29092/tcp,:::29092->29092/tcp, 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp, 0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect           /etc/confluent/docker/run        Up (health: starting)   0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
ksql-datagen      bash -c echo Waiting for K ...   Up
ksqldb-cli        /bin/sh                          Up
ksqldb-server     /etc/confluent/docker/run        Up                      0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rest-proxy        /etc/confluent/docker/run        Up                      0.0.0.0:8082->8082/tcp,:::8082->8082/tcp
schema-registry   /etc/confluent/docker/run        Up                      0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
zookeeper         /etc/confluent/docker/run        Up                      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp

We’re now creating a new topic named hascode-articles:

docker-compose exec broker kafka-topics --create --bootstrap-server \
localhost:9092 --replication-factor 1 --partitions 1 --topic hascode-articles
Created topic hascode-articles.

We may list existing topics like this:

docker-compose exec broker kafka-topics --list --bootstrap-server \                                                                                                                    1 ↵
localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log
docker-connect-configs
docker-connect-offsets
docker-connect-status
hascode-articles

Java Consumer / Producer

Now that we’ve got Kafka up and running and a fresh topic to write to, it’s time for our Java based message consumer/producer.

Maven Setup

We just need to add the following one dependency to our Mavenized project’s pom.xml:

pom.xml
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>3.1.0</version>
</dependency>

Message Producer

The following snippet sends 10 messages to the topic hascode-articles:

KafkaProducerSample.java
package com.hascode.kafka;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

public class KafkaProducerSample {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hascode-sample");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    final CountDownLatch latch = new CountDownLatch(1);
    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("hascode-articles",
          Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();

    try {
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }
}

Message Consumer

The following sample application reads messages from the topic hascode-articles and prints them to STDOUT:

KafkaConsumerExample.java
package com.hascode.kafka;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class KafkaConsumerSample {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hascode-sample");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream("hascode-articles");
    source.foreach((k, v) -> System.out.printf("key: %s, value: %s%n", k, v));
    final Topology topology = builder.build();
    System.out.println(topology.describe());

    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }
}

Running the application produces the following output when running the message producer above:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [hascode-articles])
      --> KSTREAM-FOREACH-0000000001
    Processor: KSTREAM-FOREACH-0000000001 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000000


key: 0, value: 0
key: 1, value: 1
key: 2, value: 2
key: 3, value: 3
key: 4, value: 4
key: 5, value: 5
key: 6, value: 6
key: 7, value: 7
key: 8, value: 8
key: 9, value: 9

Using Protobuf for Schema Registry

Avro has been the default serialization format, but since newer versions of Confluent Platform, other formats like protocol buffers are also supported.

Maven Setup

We’re adding a few new dependencies to our project:

  • probuf-java: For our protocol buffer integration and auxiliary types

  • kafka-protobuf-serializer: Support for serializing and deserializing our Kafka messages via protocol buffers

  • protoc-jar-maven-plugin: Generates Java classes from our proto-schema files

Therefore we need to add the following lines to our pom.xml:

pom.xml
 <dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>3.12.2</version>
</dependency>

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-protobuf-serializer</artifactId>
  <version>7.0.1</version>
</dependency>

<plugin>
  <groupId>com.github.os72</groupId>
  <artifactId>protoc-jar-maven-plugin</artifactId>
  <version>3.11.4</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>run</goal>
      </goals>
      <configuration>
        <addProtoSources>all</addProtoSources>
        <includeMavenTypes>direct</includeMavenTypes>
        <inputDirectories>
          <include>src/main/protobuf</include>
        </inputDirectories>
      </configuration>
    </execution>
  </executions>
</plugin>

<!-- needed for kafka-protobuf-serializer -->
<repositories>
  <repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
  </repository>
</repositories>

Protocol Buffer Message Definition

We’re creating a new file named messages.proto in the directory src/main/protobuf and we’re specifying a simple message type there for newly created blog articles (having a title and a url):

messages.proto
syntax = "proto3";

package com.hascode.sample;

message ArticleCreated {
  string url = 1;
  string title = 2;
}

Now we’re able to generate Java classes from this schema using the following command:

./mvnw generate-sources

Producing Protobuf Messages

The following app sends protobuf serialized ArticleCreated messages to the Kafka broker:

ProtoSendingProducer.java
package com.hascode.kafkaproto;

import com.hascode.sample.Messages.ArticleCreated;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class ProtoSendingProducer {

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
    properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://localhost:8081");

    Producer producer = new KafkaProducer<>(properties);
    ArticleCreated message1 = ArticleCreated.newBuilder()
        .setTitle("Neo4j Graph Database Tutorial: How to build a Route Planner and other Examples")
        .setUrl(
            "https://www.hascode.com/2012/01/neo4j-graph-database-tutorial-how-to-build-a-route-planner-and-other-examples/")
        .build();

    ArticleCreated message2 = ArticleCreated.newBuilder().setTitle(
            "Testing Java Applications for Resilience by Simulating Network Problems with Toxiproxy, JUnit and the Docker Maven Plugin")
        .setUrl(
            "https://www.hascode.com/2018/07/testing-java-applications-for-resilience-by-simulating-network-problems-with-toxiproxy-junit-and-the-docker-maven-plugin/")
        .build();

    producer.send(new ProducerRecord<>("hascode-articles", "article1", message1));
    producer.send(new ProducerRecord<>("hascode-articles", "article2", message2));
    producer.close();

  }
}

Consuming Protobuf Messages

The following class connects to the schema registry and consumes our ArticleCreated messages:

ProtoReceivingConsumer.java
package com.hascode.kafkaproto;

import com.hascode.sample.Messages.ArticleCreated;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ProtoReceivingConsumer {

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "articles-consumer-group");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
    properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://localhost:8081");
    properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
        ArticleCreated.class.getName());

    KafkaConsumer<String, ArticleCreated> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singleton("hascode-articles"));

    while (true) {
      ConsumerRecords<String, ArticleCreated> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, ArticleCreated> record : records) {
        System.out.printf("Article title: %s, url: %s%n", record.value().getTitle(),
            record.value().getUrl());
      }
      consumer.commitAsync();
    }
  }

}

Running the consumer now produces the following output:

Article title: Neo4j Graph Database Tutorial: How to build a Route Planner and other Examples, url: https://www.hascode.com/2012/01/neo4j-graph-database-tutorial-how-to-build-a-route-planner-and-other-examples/
Article title: Testing Java Applications for Resilience by Simulating Network Problems with Toxiproxy, JUnit and the Docker Maven Plugin, url: https://www.hascode.com/2018/07/testing-java-applications-for-resilience-by-simulating-network-problems-with-toxiproxy-junit-and-the-docker-maven-plugin/

Spring Boot Kafka Starter

Spring Boot and its Kafka starter makes everything way easier..

This is our new pom.xml:

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.hascode.tutorial</groupId>
	<artifactId>spring-kafka-tutorial</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-kafka-tutorial</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

And this is our application including producer and consumer capabilities in one class also as the ability to create a new topic if it does not exist yet:

SpringKafkaTutorialApplication.java
package com.hascode.tutorial.kspring;

import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class SpringKafkaTutorialApplication {

	private static final String TOPIC_NAME = "hascode-articles";

	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaTutorialApplication.class, args);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name(TOPIC_NAME)
				.partitions(10)
				.replicas(1)
				.build();
	}

	@KafkaListener(id = "articlesInTopic", topics = TOPIC_NAME)
	public void listen(String in) {
		System.out.printf("message from topic %s: %s%n",TOPIC_NAME, in);
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			IntStream.range(1,21).forEach(num -> template.send(TOPIC_NAME, "hello world #"+num));
		};
	}

}