Messaging with NATS and Java

Figure 1. NATS Architecture Component Diagram NATS is a high-performance messaging system that offers simplicity, speed, and scalability. It is particularly suited for building distributed systems and microservices. This article demonstrates how to integrate NATS with Java, showcasing the essential steps to set up, connect, and publish/subscribe to messages. Prerequisites Before diving in, we should ensure to have the following installed: Java Development Kit (JDK) 11 or later. Maven or Gradle for dependency management. Docker (optional). ...

November 26, 2024 · 13 min · 2646 words · Micha Kops

Spring Boot Kafka Increase Message Size Limit

Let’s say we would like to increase the limit t 10MB …​ Broker Configuration Apply the new limit either by modifying the server.properties like this…​ max.message.bytes=10485760 or apply it to a specific topic using kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name thetopic \ --alter \ --add-config max.message.bytes=10485760 Producer Configuration for Spring Boot We simply need to add the following line to our application.properties: spring.kafka.producer.properties.max.request.size=spring.kafka.producer.properties.max.request.size The following proof-of-concept demonstrates that without the property, sending a large message fails, with the property it succeeds: ...

June 9, 2022 · 2 min · 270 words · Micha Kops

Embedded Kafka for Spring Boot Testing without using Docker

Sometimes it is nice to set up an embedded Kafka broker for testing without the need to have Docker installed (e.g. for using testcontainers-lib). The following snippet shows, how to set up an embedded Kafka instance for testing for a Spring Boot project. Setup Using Maven, this is our Spring Boot project with dependencies needed: pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId>(1) <version>2.6.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hascode.tutorial</groupId> <artifactId>kafka-testing</artifactId> <version>1.0.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>17</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <kafka.version>3.1.0</kafka.version>(2) </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId>(3) </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId>(4) <scope>test</scope> </dependency> <dependency>(5) <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <scope>test</scope> </dependency> [..] </project> ...

May 25, 2022 · 4 min · 839 words · Micha Kops

Kafka Java Quickstart with Docker

Goals Setup Kafka and Zookeeper with Docker and docker-compose Create a message consumer and producer in Java Kafka Setup We’re using docker-compose to set up our message broker, zookeper and other stuff using confluent-platform. This is our docker-compose.yaml config file from Confluent’s following GitHub repository. docker-compose.yaml --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost schema-registry: image: confluentinc/cp-schema-registry:7.0.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR ksqldb-server: image: confluentinc/cp-ksqldb-server:7.0.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.0.1 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.0.1 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.0.1 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' ...

January 29, 2022 · 8 min · 1500 words · Micha Kops

Playing around with MQTT and Java with Moquette and Eclipse Paho

The MQ Telemetry Transport Protocol (MQTT) is a lightweight publish/subscribe messaging protocol developed in 1999 that experiences a growing popularity due to trends like the Internet-of-Things and the need to exchange information between low powered devices with aspects as CPU and bandwidth usage in mind. In the following tutorial I’d like to demonstrate how to set-up a broker for this protocol with the help of the Moquette library and how to create a client and publish messages for a specific topic using this broker and Eclipse Paho as client library. ...

June 1, 2016 · 4 min · 784 words · Micha Kops

Using Apache Camel with Scala and the Camel Scala DSL

Whenever I encounter a situation where I have to mix a blend of different services and endpoints and apply one or more of the traditional enterprise integration patterns then Apache Camel often is my weapon of choice. I simply love how easy it is to set up some datasources, add some routing magic, data transformers, load balancers, content enrichers and enjoy the result. Another thing that I’m beginning to love is Scala and so this is the perfect time to write an article about using Scala and Apache Camel together. ...

February 13, 2013 · 15 min · 3186 words · Micha Kops

Message Driven Beans in Java EE 6

Message Driven Beans are no new concept due to the fact that they exist since EJB 2.0 but in Java EE 6 and the EJB 3.0 specification it is even more fun to use them. In this tutorial we’re going to take a look at the specification and create an example application that transfers some objects via the Java Message Service to a Message-Driven Bean deployed on a GlassFish application server. ...

June 5, 2011 · 12 min · 2430 words · Micha Kops

How to create a simple Messaging Application using RabbitMQ 2 and Maven

Having read an interesting comparison by Lindenlabs evaluating modern message broker systems like ActiveMQ, ApacheQpid and amongst others – RabbitMQ – I wanted to take a quick look at the last one and built a small application producing and consuming some sample messages. If you need some lecture on getting started with RabbitMQ or the key concepts of messaging I strongly recommend reading this list of introductional articles from the RabbitMQ homepage. ...

September 5, 2010 · 5 min · 990 words · Micha Kops

AMQP and RabbitMQ Snippets

rabbitmqctl Create admin user with full host permissions # create new user named 'theadmin' rabbitmqctl add_user theadmin thepassword # make 'theadmin' admin rabbitmqctl set_user_tags theadmin administrator # give 'theadmin' permissions for all hosts rabbitmqctl set_permissions -p / theadmin ".*" ".*" ".*" AMQP Exchange Types 1. Fanout Exchange Description: A fanout exchange routes messages to all of the queues that are bound to it. It doesn’t take the routing key into consideration. Instead, it simply broadcasts the message to all bound queues. ...

March 1, 2010 · 3 min · 439 words · Micha Kops

Kafka Snippets

Start an Image with kcat / kafka-cat for Debugging kubectl -n NAMESPACE run "$(whoami)-debug" -it --rm \ --image=confluentinc/cp-kafkacat:6.1.9 \ --restart=Never \ -- bash Dockerfile for Kafka Analysis Container with different Tools With jq, kafka console tools, schema registry tools and kafkacat installed …​. Dockerfile FROM confluentinc/cp-kafka:6.2.1 as cp-kafka FROM confluentinc/cp-schema-registry:6.2.1 as cp-schema-registry FROM debian:10-slim ARG DEBIAN_FRONTEND=noninteractive # Install necessary tools RUN apt-get update && apt-get install -y \ curl \ jq \ yq \ && rm -rf /var/lib/apt/lists/* # Install kafkacat binary RUN apt-get update && apt-get install -y kafkacat && rm -rf /var/lib/apt/lists/* # Copy Kafka binaries COPY --from=cp-kafka /usr/bin/kafka-* /usr/bin/ COPY --from=cp-schema-registry /usr/bin/schema-registry* /usr/bin/ # Copy entrypoint script COPY entrypoint.sh /usr/bin/entrypoint.sh RUN chmod +x /usr/bin/entrypoint.sh ENTRYPOINT ["/usr/bin/entrypoint.sh"] ...

March 1, 2010 · 7 min · 1333 words · Micha Kops