Spring Boot Kafka Increase Message Size Limit

Let’s say we would like to increase the limit t 10MB …​ Broker Configuration Apply the new limit either by modifying the server.properties like this…​ max.message.bytes=10485760 or apply it to a specific topic using kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name thetopic \ --alter \ --add-config max.message.bytes=10485760 Producer Configuration for Spring Boot We simply need to add the following line to our application.properties: spring.kafka.producer.properties.max.request.size=spring.kafka.producer.properties.max.request.size The following proof-of-concept demonstrates that without the property, sending a large message fails, with the property it succeeds: ...

June 9, 2022 · 2 min · 270 words · Micha Kops

Kafka Java Quickstart with Docker

Goals Setup Kafka and Zookeeper with Docker and docker-compose Create a message consumer and producer in Java Kafka Setup We’re using docker-compose to set up our message broker, zookeper and other stuff using confluent-platform. This is our docker-compose.yaml config file from Confluent’s following GitHub repository. docker-compose.yaml --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost schema-registry: image: confluentinc/cp-schema-registry:7.0.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR ksqldb-server: image: confluentinc/cp-ksqldb-server:7.0.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.0.1 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.0.1 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.0.1 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' ...

January 29, 2022 · 8 min · 1500 words · Micha Kops

Reactive Streams – Java 9 Flow API, RxJava, Akka and Reactor Examples

Reactive Streams is an initiative trying to standardize asynchronous stream processing with non-blocking back-pressure. With Java 9, new classes in the java.util.concurrent.flow package offer a semantically equivalent counterpart to this standard that may be adopted by other frameworks. In the following short tutorial we’re implementing examples for reactive streams with Java 9 and the Flow API, with RxJava2, with Akka, with Reactor and finally there is an example in RxJava1, too though it does not follow the standard. ...

January 14, 2018 · 8 min · 1550 words · Micha Kops

Java EE 7 JMX Reports with Yammer Metrics

There are several ways to aggregate and report application performance indicators in a Java application. One common way here is to use Java Management Extensions (JMX) and MBeans. The Yammer Metrics Library eases this task for us and simplifies the aggregation of different reports. In the following tutorial, we’re going to set up a full Java EE 7 web application by the help of Maven archetypes and we’re running the application on WildFly application server that is downloaded and configured completely by the WildFly Maven Plugin. ...

August 26, 2014 · 10 min · 2117 words · Micha Kops