Kafka Setup
We’re using docker-compose
to set up our message broker, zookeper and other stuff using confluent-platform
.
This is our docker-compose.yaml
config file from Confluent’s following
GitHub repository.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.0.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.0.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.0.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Now we’re ready to start everything like this:
docker-compose up -d
Creating network "confluent-platform-docker_default" with the default driver
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating rest-proxy ... done
Creating connect ... done
Creating ksqldb-server ... done
Creating ksql-datagen ... done
Creating ksqldb-cli ... done
Use docker-compose ps
to verify that everything is running fine:
docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp,:::29092->29092/tcp, 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp, 0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect /etc/confluent/docker/run Up (health: starting) 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
ksql-datagen bash -c echo Waiting for K ... Up
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp,:::8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp
We’re now creating a new topic named hascode-articles
:
docker-compose exec broker kafka-topics --create --bootstrap-server \
localhost:9092 --replication-factor 1 --partitions 1 --topic hascode-articles
Created topic hascode-articles.
We may list existing topics like this:
docker-compose exec broker kafka-topics --list --bootstrap-server \ 1 ↵
localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log
docker-connect-configs
docker-connect-offsets
docker-connect-status
hascode-articles
Java Consumer / Producer
Now that we’ve got Kafka up and running and a fresh topic to write to, it’s time for our Java based message consumer/producer.
Maven Setup
We just need to add the following one dependency to our Mavenized project’s pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.0</version>
</dependency>
Message Producer
The following snippet sends 10 messages to the topic hascode-articles
:
package com.hascode.kafka;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
public class KafkaProducerSample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hascode-sample");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
final CountDownLatch latch = new CountDownLatch(1);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("hascode-articles",
Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully");
producer.close();
try {
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
Message Consumer
The following sample application reads messages from the topic hascode-articles
and prints them to STDOUT
:
package com.hascode.kafka;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
public class KafkaConsumerSample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hascode-sample");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("hascode-articles");
source.foreach((k, v) -> System.out.printf("key: %s, value: %s%n", k, v));
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
Running the application produces the following output when running the message producer above:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hascode-articles])
--> KSTREAM-FOREACH-0000000001
Processor: KSTREAM-FOREACH-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
key: 0, value: 0
key: 1, value: 1
key: 2, value: 2
key: 3, value: 3
key: 4, value: 4
key: 5, value: 5
key: 6, value: 6
key: 7, value: 7
key: 8, value: 8
key: 9, value: 9
Using Protobuf for Schema Registry
Avro has been the default serialization format, but since newer versions of Confluent Platform, other formats like protocol buffers are also supported.
Maven Setup
We’re adding a few new dependencies to our project:
-
probuf-java: For our protocol buffer integration and auxiliary types
-
kafka-protobuf-serializer: Support for serializing and deserializing our Kafka messages via protocol buffers
-
protoc-jar-maven-plugin: Generates Java classes from our proto-schema files
Therefore we need to add the following lines to our pom.xml
:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.12.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<addProtoSources>all</addProtoSources>
<includeMavenTypes>direct</includeMavenTypes>
<inputDirectories>
<include>src/main/protobuf</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
<!-- needed for kafka-protobuf-serializer -->
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Protocol Buffer Message Definition
We’re creating a new file named messages.proto
in the directory src/main/protobuf
and we’re specifying a simple message type there for
newly created blog articles (having a title and a url):
syntax = "proto3";
package com.hascode.sample;
message ArticleCreated {
string url = 1;
string title = 2;
}
Now we’re able to generate Java classes from this schema using the following command:
./mvnw generate-sources
Producing Protobuf Messages
The following app sends protobuf serialized ArticleCreated
messages to the Kafka broker:
package com.hascode.kafkaproto;
import com.hascode.sample.Messages.ArticleCreated;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProtoSendingProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
Producer producer = new KafkaProducer<>(properties);
ArticleCreated message1 = ArticleCreated.newBuilder()
.setTitle("Neo4j Graph Database Tutorial: How to build a Route Planner and other Examples")
.setUrl(
"https://www.hascode.com/2012/01/neo4j-graph-database-tutorial-how-to-build-a-route-planner-and-other-examples/")
.build();
ArticleCreated message2 = ArticleCreated.newBuilder().setTitle(
"Testing Java Applications for Resilience by Simulating Network Problems with Toxiproxy, JUnit and the Docker Maven Plugin")
.setUrl(
"https://www.hascode.com/2018/07/testing-java-applications-for-resilience-by-simulating-network-problems-with-toxiproxy-junit-and-the-docker-maven-plugin/")
.build();
producer.send(new ProducerRecord<>("hascode-articles", "article1", message1));
producer.send(new ProducerRecord<>("hascode-articles", "article2", message2));
producer.close();
}
}
Consuming Protobuf Messages
The following class connects to the schema registry and consumes our ArticleCreated
messages:
package com.hascode.kafkaproto;
import com.hascode.sample.Messages.ArticleCreated;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ProtoReceivingConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "articles-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
ArticleCreated.class.getName());
KafkaConsumer<String, ArticleCreated> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("hascode-articles"));
while (true) {
ConsumerRecords<String, ArticleCreated> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, ArticleCreated> record : records) {
System.out.printf("Article title: %s, url: %s%n", record.value().getTitle(),
record.value().getUrl());
}
consumer.commitAsync();
}
}
}
Running the consumer now produces the following output:
Article title: Neo4j Graph Database Tutorial: How to build a Route Planner and other Examples, url: https://www.hascode.com/2012/01/neo4j-graph-database-tutorial-how-to-build-a-route-planner-and-other-examples/
Article title: Testing Java Applications for Resilience by Simulating Network Problems with Toxiproxy, JUnit and the Docker Maven Plugin, url: https://www.hascode.com/2018/07/testing-java-applications-for-resilience-by-simulating-network-problems-with-toxiproxy-junit-and-the-docker-maven-plugin/
Spring Boot Kafka Starter
Spring Boot and its Kafka starter makes everything way easier..
This is our new pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hascode.tutorial</groupId>
<artifactId>spring-kafka-tutorial</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka-tutorial</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
And this is our application including producer and consumer capabilities in one class also as the ability to create a new topic if it does not exist yet:
package com.hascode.tutorial.kspring;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class SpringKafkaTutorialApplication {
private static final String TOPIC_NAME = "hascode-articles";
public static void main(String[] args) {
SpringApplication.run(SpringKafkaTutorialApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name(TOPIC_NAME)
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "articlesInTopic", topics = TOPIC_NAME)
public void listen(String in) {
System.out.printf("message from topic %s: %s%n",TOPIC_NAME, in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(1,21).forEach(num -> template.send(TOPIC_NAME, "hello world #"+num));
};
}
}