From 992d9faa7dfe298ce0a3d7fa502e7d43347dc0fa Mon Sep 17 00:00:00 2001 From: dilipsundarraj Date: Sat, 15 Apr 2023 11:56:38 -0500 Subject: [PATCH 01/15] updated the code sb3.0 --- library-events-producer/build.gradle | 10 ++-- .../gradle/wrapper/gradle-wrapper.properties | 2 +- .../controller/LibraryEventsController.java | 3 +- .../main/java/com/learnkafka/domain/Book.java | 4 +- .../com/learnkafka/domain/LibraryEvent.java | 4 +- .../producer/LibraryEventProducer.java | 59 ++++++++----------- .../LibraryEventProducerUnitTest.java | 17 ++---- 7 files changed, 44 insertions(+), 55 deletions(-) diff --git a/library-events-producer/build.gradle b/library-events-producer/build.gradle index f161eb9..8314808 100644 --- a/library-events-producer/build.gradle +++ b/library-events-producer/build.gradle @@ -1,12 +1,12 @@ plugins { - id 'org.springframework.boot' version '2.6.5' - id 'io.spring.dependency-management' version '1.0.11.RELEASE' + id 'org.springframework.boot' version '3.0.5' + id 'io.spring.dependency-management' version '1.1.0' id 'java' } group = 'com.learnkafka' version = '0.0.1-SNAPSHOT' -sourceCompatibility = '11' +sourceCompatibility = '17' configurations { compileOnly { @@ -22,6 +22,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-validation' + compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { @@ -36,6 +37,7 @@ sourceSets{ } } -test { +tasks.named('test') { useJUnitPlatform() } + diff --git a/library-events-producer/gradle/wrapper/gradle-wrapper.properties b/library-events-producer/gradle/wrapper/gradle-wrapper.properties index 68a9d70..4c6d24e 100644 --- a/library-events-producer/gradle/wrapper/gradle-wrapper.properties +++ b/library-events-producer/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip \ No newline at end of file diff --git a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java index 85b1897..93136ed 100644 --- a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java +++ b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java @@ -4,17 +4,16 @@ import com.learnkafka.domain.LibraryEvent; import com.learnkafka.domain.LibraryEventType; import com.learnkafka.producer.LibraryEventProducer; +import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.kafka.support.SendResult; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import javax.validation.Valid; import java.util.concurrent.ExecutionException; @RestController diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/Book.java b/library-events-producer/src/main/java/com/learnkafka/domain/Book.java index fcaad45..e5cb115 100644 --- a/library-events-producer/src/main/java/com/learnkafka/domain/Book.java +++ b/library-events-producer/src/main/java/com/learnkafka/domain/Book.java @@ -1,13 +1,13 @@ package com.learnkafka.domain; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java index 2252fd0..25f7c25 100644 --- a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java +++ b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java @@ -1,13 +1,13 @@ package com.learnkafka.domain; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor diff --git a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java index 11e9347..96ba37e 100644 --- a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java +++ b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java @@ -11,10 +11,9 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -24,53 +23,47 @@ public class LibraryEventProducer { @Autowired - KafkaTemplate kafkaTemplate; + KafkaTemplate kafkaTemplate; String topic = "library-events"; @Autowired ObjectMapper objectMapper; - public void sendLibraryEvent(LibraryEvent libraryEvent) throws JsonProcessingException { + public CompletableFuture> sendLibraryEvent(LibraryEvent libraryEvent) throws JsonProcessingException { Integer key = libraryEvent.getLibraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key,value); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, value, ex); - } - - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, value, result); - } - }); + var completableFuture = kafkaTemplate.sendDefault(key, value); + return completableFuture + .whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, value, throwable); + } else { + handleSuccess(key, value, sendResult); + + } + }); } - public ListenableFuture> sendLibraryEvent_Approach2(LibraryEvent libraryEvent) throws JsonProcessingException { + public CompletableFuture> sendLibraryEvent_Approach2(LibraryEvent libraryEvent) throws JsonProcessingException { Integer key = libraryEvent.getLibraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - ProducerRecord producerRecord = buildProducerRecord(key, value, topic); - - ListenableFuture> listenableFuture = kafkaTemplate.send(producerRecord); + ProducerRecord producerRecord = buildProducerRecord(key, value, topic); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, value, ex); - } + var completableFuture = kafkaTemplate.send(producerRecord); - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, value, result); - } - }); + return completableFuture + .whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, value, throwable); + } else { + handleSuccess(key, value, sendResult); - return listenableFuture; + } + }); } private ProducerRecord buildProducerRecord(Integer key, String value, String topic) { @@ -86,9 +79,9 @@ public SendResult sendLibraryEventSynchronous(LibraryEvent libr Integer key = libraryEvent.getLibraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - SendResult sendResult=null; + SendResult sendResult = null; try { - sendResult = kafkaTemplate.sendDefault(key,value).get(1, TimeUnit.SECONDS); + sendResult = kafkaTemplate.sendDefault(key, value).get(1, TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException e) { log.error("ExecutionException/InterruptedException Sending the Message and the exception is {}", e.getMessage()); throw e; diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java index e38c47f..a0c2a01 100644 --- a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java +++ b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java @@ -7,7 +7,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.Field; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -16,10 +15,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; -import scala.Int; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -51,10 +48,8 @@ void sendLibraryEvent_Approach2_failure() throws JsonProcessingException, Execut .libraryEventId(null) .book(book) .build(); - SettableListenableFuture future = new SettableListenableFuture(); - future.setException(new RuntimeException("Exception Calling Kafka")); - when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(future); + when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(CompletableFuture.supplyAsync(()-> new RuntimeException("Exception Calling Kafka"))); //when assertThrows(Exception.class, ()->eventProducer.sendLibraryEvent_Approach2(libraryEvent).get()); @@ -75,21 +70,21 @@ void sendLibraryEvent_Approach2_success() throws JsonProcessingException, Execut .book(book) .build(); String record = objectMapper.writeValueAsString(libraryEvent); - SettableListenableFuture future = new SettableListenableFuture(); + ProducerRecord producerRecord = new ProducerRecord("library-events", libraryEvent.getLibraryEventId(),record ); RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("library-events", 1), 1,1,System.currentTimeMillis(), 1, 2); SendResult sendResult = new SendResult(producerRecord,recordMetadata); - future.set(sendResult); + var future = CompletableFuture.supplyAsync(()-> sendResult); when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(future); //when - ListenableFuture> listenableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEvent); + var completableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEvent); //then - SendResult sendResult1 = listenableFuture.get(); + SendResult sendResult1 = completableFuture.get(); assert sendResult1.getRecordMetadata().partition()==1; } From b01bee86f6e1205a7dd3c6bf891ab44d4ed0d5af Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Sun, 23 Apr 2023 16:00:35 -0500 Subject: [PATCH 02/15] added the docker commands until key --- SetUpKafkaDocker.md | 63 +++++++++++++++++++++++++++++ docker-compose-multi-broker.yml | 72 +++++++++++++++++++++++++++++++++ docker-compose.yml | 33 +++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 SetUpKafkaDocker.md create mode 100644 docker-compose-multi-broker.yml create mode 100644 docker-compose.yml diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md new file mode 100644 index 0000000..3689fb1 --- /dev/null +++ b/SetUpKafkaDocker.md @@ -0,0 +1,63 @@ +# Set Up Kafka in Local using Docker + +## Set up broker and zookeeper + +- Navigate to the path where the **docker-compose.yml** is located and then run the below command. + +``` +docker-compose up +``` + +## Producer and Consume the Messages + +- Let's going to the container by running the below command. + +``` +docker exec -it kafka1 bash +``` + +- Create a Kafka topic using the **kafka-topics** command. + - **kafka1:19092** refers to the **KAFKA_ADVERTISED_LISTENERS** in the docker-compose.yml file. + +``` +kafka-topics --bootstrap-server kafka1:19092 \ + --create \ + --topic test-topic \ + --replication-factor 1 --partitions 1 +``` + +- Produce Messages to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server kafka1:19092 \ + --topic test-topic +``` + +- Consume Messages from the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --from-beginning +``` + +## Producer and Consume the Messages With Key and Value + +- Produce Messages with Key and Value to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --property "key.separator=-" --property "parse.key=true" +``` + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --from-beginning \ + --property "key.separator= - " --property "print.key=true" +``` diff --git a/docker-compose-multi-broker.yml b/docker-compose-multi-broker.yml new file mode 100644 index 0000000..b5e0b73 --- /dev/null +++ b/docker-compose-multi-broker.yml @@ -0,0 +1,72 @@ +version: '2.1' + +services: + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + depends_on: + - zoo1 + + kafka2: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka2 + container_name: kafka2 + ports: + - "9093:9093" + - "29093:29093" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 2 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + depends_on: + - zoo1 + + + kafka3: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka3 + container_name: kafka3 + ports: + - "9094:9094" + - "29094:29094" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 3 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + depends_on: + - zoo1 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..abca084 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,33 @@ +version: '2.1' + +services: + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 From 695228c4e02012a4b0b2f11b870932bf7a96af6a Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Sun, 23 Apr 2023 16:03:01 -0500 Subject: [PATCH 03/15] updated the readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 178d82e..e13e23a 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ This repository has the complete code related to kafka producers/consumers using spring boot. +## Kafka SetUp +- [Setup-Kafka-Using-Docker](SetUpKafkaDocker.md) + - [Setup-Kafka](https://github.com/dilipsundarraj1/kafka-for-developers-using-spring-boot/blob/master/SetUpKafka.md) From c5c365ac2725f6a85bfd8e215145895840126c7c Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Sun, 23 Apr 2023 16:12:24 -0500 Subject: [PATCH 04/15] completed until descripve commands --- SetUpKafkaDocker.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index 3689fb1..0355498 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -54,6 +54,8 @@ kafka-console-producer --bootstrap-server kafka1:19092 \ --property "key.separator=-" --property "parse.key=true" ``` +- Consuming messages with Key and Value from a topic. + ``` docker exec --interactive --tty kafka1 \ kafka-console-consumer --bootstrap-server kafka1:19092 \ @@ -61,3 +63,29 @@ kafka-console-consumer --bootstrap-server kafka1:19092 \ --from-beginning \ --property "key.separator= - " --property "print.key=true" ``` + +## List the topics in a cluster + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --list + +``` + +## Describe topic + +- Command to describe all the Kafka topics. + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --describe +``` + +- Command to describe all the Kafka topics. + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --describe \ +--topic test-topic + +``` From f1e42ad3a8254d926c90fd4995413466e707ccb0 Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Mon, 24 Apr 2023 06:10:20 -0500 Subject: [PATCH 05/15] completed until consmer groups --- SetUpKafkaDocker.md | 46 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index 0355498..eb5be40 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -64,7 +64,26 @@ kafka-console-consumer --bootstrap-server kafka1:19092 \ --property "key.separator= - " --property "print.key=true" ``` -## List the topics in a cluster +### Consume Messages using Consumer Groups + + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic --group console-consumer-41911\ + --property "key.separator= - " --property "print.key=true" +``` + +- Example Messages: + +``` +a-abc +b-bus +``` + +## Advanced Kafka Commands + +### List the topics in a cluster ``` docker exec --interactive --tty kafka1 \ @@ -72,7 +91,7 @@ kafka-topics --bootstrap-server kafka1:19092 --list ``` -## Describe topic +### Describe topic - Command to describe all the Kafka topics. @@ -89,3 +108,26 @@ kafka-topics --bootstrap-server kafka1:19092 --describe \ --topic test-topic ``` + +### Alter topic Partitions + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 \ +--alter --topic test-topic --partitions 40 +``` + +### How to view consumer groups + +``` +docker exec --interactive --tty kafka1 \ +kafka-consumer-groups --bootstrap-server kafka1:19092 --list +``` + +#### Consumer Groups and their Offset + +``` +docker exec --interactive --tty kafka1 \ +kafka-consumer-groups --bootstrap-server kafka1:19092 \ +--describe --group console-consumer-41911 +``` From 17c50dbcc2bc2ec896652868619b4961a958a851 Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Mon, 24 Apr 2023 09:05:02 -0500 Subject: [PATCH 06/15] added the commands until commit log --- SetUpKafkaDocker.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index eb5be40..2809bda 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -131,3 +131,27 @@ docker exec --interactive --tty kafka1 \ kafka-consumer-groups --bootstrap-server kafka1:19092 \ --describe --group console-consumer-41911 ``` + +## Log file + +- The config file is present in the below path. + +``` +/etc/kafka/server.properties +``` + +- The log file is present in the below path. + +``` +/var/lib/kafka/data/ +``` + +### How to view the commit log? + +``` +docker exec --interactive --tty kafka1 \ +kafka-run-class kafka.tools.DumpLogSegments \ +--deep-iteration \ +--files /var/lib/kafka/data/test-topic-0/00000000000000000000.log + +``` From d3ebf7297d58a30d7f0c0cdbe58fe6c9aea207e5 Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Tue, 25 Apr 2023 15:15:00 -0500 Subject: [PATCH 07/15] added the correct commands --- SetUpKafkaDocker.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index 2809bda..290c27e 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -132,7 +132,13 @@ kafka-consumer-groups --bootstrap-server kafka1:19092 \ --describe --group console-consumer-41911 ``` -## Log file +## Log file and related config + +- Log into the container. + +``` +docker exec --interactive --tty kafka1 +``` - The config file is present in the below path. From ff9f28d2e729bfbb1d01e70f4b62385b3080b64d Mon Sep 17 00:00:00 2001 From: Dilip Sundarraj Date: Wed, 26 Apr 2023 08:38:53 -0500 Subject: [PATCH 08/15] updated the code to use the latest kafka consumer; --- library-events-consumer/build.gradle | 6 +- .../gradle/wrapper/gradle-wrapper.properties | 6 +- .../LibraryEventsConsumerConfigLegacy.java | 132 ------------------ .../main/java/com/learnkafka/entity/Book.java | 10 +- .../com/learnkafka/entity/FailureRecord.java | 6 +- .../com/learnkafka/entity/LibraryEvent.java | 5 +- .../learnkafka/scheduler/RetryScheduler.java | 7 +- .../service/LibraryEventsService.java | 15 +- 8 files changed, 22 insertions(+), 165 deletions(-) delete mode 100644 library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java diff --git a/library-events-consumer/build.gradle b/library-events-consumer/build.gradle index cb5b270..d901fe3 100644 --- a/library-events-consumer/build.gradle +++ b/library-events-consumer/build.gradle @@ -1,12 +1,12 @@ plugins { - id 'org.springframework.boot' version '2.6.5' - id 'io.spring.dependency-management' version '1.0.11.RELEASE' + id 'org.springframework.boot' version '3.0.5' + id 'io.spring.dependency-management' version '1.1.0' id 'java' } group = 'com.learnkafka' version = '0.0.1-SNAPSHOT' -sourceCompatibility = '11' +sourceCompatibility = '17' configurations { compileOnly { diff --git a/library-events-consumer/gradle/wrapper/gradle-wrapper.properties b/library-events-consumer/gradle/wrapper/gradle-wrapper.properties index e62e484..4c6d24e 100644 --- a/library-events-consumer/gradle/wrapper/gradle-wrapper.properties +++ b/library-events-consumer/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Jan 08 05:18:41 CST 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-all.zip +#Fri Dec 27 05:51:26 CST 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -zipStorePath=wrapper/dists zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip \ No newline at end of file diff --git a/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java b/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java deleted file mode 100644 index d68f03b..0000000 --- a/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.learnkafka.config; - -import com.learnkafka.service.LibraryEventsService; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.dao.RecoverableDataAccessException; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.retry.RetryPolicy; -import org.springframework.retry.backoff.FixedBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; -import org.springframework.retry.support.RetryTemplate; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -//@Configuration -//@EnableKafka -@Slf4j -public class LibraryEventsConsumerConfigLegacy { - - @Autowired - LibraryEventsService libraryEventsService; - - @Autowired - KafkaProperties kafkaProperties; - - @Bean - @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ObjectProvider> kafkaConsumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory - .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.kafkaProperties.buildConsumerProperties()))); - factory.setConcurrency(3); - // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); - factory.setErrorHandler(((thrownException, data) -> { - log.info("Exception in consumerConfig is {} and the record is {}", thrownException.getMessage(), data); - //persist - })); - factory.setRetryTemplate(retryTemplate()); - factory.setRecoveryCallback((context -> { - if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){ - //invoke recovery logic - log.info("Inside the recoverable logic"); - Arrays.asList(context.attributeNames()) - .forEach(attributeName -> { - log.info("Attribute name is : {} ", attributeName); - log.info("Attribute Value is : {} ", context.getAttribute(attributeName)); - }); - - ConsumerRecord consumerRecord = (ConsumerRecord) context.getAttribute("record"); - libraryEventsService.handleRecovery(consumerRecord); - }else{ - log.info("Inside the non recoverable logic"); - throw new RuntimeException(context.getLastThrowable().getMessage()); - } - - - return null; - })); - return factory; - } - - /* @Bean - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ConsumerFactory kafkaConsumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory); - factory.setConcurrency(3); - // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); - factory.setErrorHandler(((thrownException, data) -> { - log.info("Exception in consumerConfig is {} and the record is {}", thrownException.getMessage(), data); - //persist - })); - factory.setRetryTemplate(retryTemplate()); - factory.setRecoveryCallback((context -> { - if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){ - //invoke recovery logic - log.info("Inside the recoverable logic"); - Arrays.asList(context.attributeNames()) - .forEach(attributeName -> { - log.info("Attribute name is : {} ", attributeName); - log.info("Attribute Value is : {} ", context.getAttribute(attributeName)); - }); - - ConsumerRecord consumerRecord = (ConsumerRecord) context.getAttribute("record"); - libraryEventsService.handleRecovery(consumerRecord); - }else{ - log.info("Inside the non recoverable logic"); - throw new RuntimeException(context.getLastThrowable().getMessage()); - } - - - return null; - })); - return factory; - }*/ - - private RetryTemplate retryTemplate() { - - FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); - fixedBackOffPolicy.setBackOffPeriod(1000); - RetryTemplate retryTemplate = new RetryTemplate(); - retryTemplate.setRetryPolicy(simpleRetryPolicy()); - retryTemplate.setBackOffPolicy(fixedBackOffPolicy); - return retryTemplate; - } - - private RetryPolicy simpleRetryPolicy() { - - /*SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(); - simpleRetryPolicy.setMaxAttempts(3);*/ - Map, Boolean> exceptionsMap = new HashMap<>(); - exceptionsMap.put(IllegalArgumentException.class, false); - exceptionsMap.put(RecoverableDataAccessException.class, true); - SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3,exceptionsMap,true); - return simpleRetryPolicy; - } -} \ No newline at end of file diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java b/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java index 99847cb..732b9ec 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java @@ -1,17 +1,15 @@ package com.learnkafka.entity; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.OneToOne; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.JoinColumn; -import javax.persistence.OneToOne; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java b/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java index 77292b4..9ff24ee 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java @@ -1,12 +1,14 @@ package com.learnkafka.entity; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.persistence.*; @AllArgsConstructor @NoArgsConstructor @@ -18,7 +20,7 @@ public class FailureRecord { @GeneratedValue private Integer bookId; private String topic; - private Integer key; + private Integer key_value; private String errorRecord; private Integer partition; private Long offset_value; diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java b/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java index 3ba650f..6e20182 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java @@ -1,12 +1,9 @@ package com.learnkafka.entity; +import jakarta.persistence.*; import lombok.*; -import javax.persistence.*; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; - @AllArgsConstructor @NoArgsConstructor @Data diff --git a/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java b/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java index dad6ac8..17f394b 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java +++ b/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java @@ -1,18 +1,13 @@ package com.learnkafka.scheduler; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.learnkafka.config.LibraryEventsConsumerConfig; -import com.learnkafka.consumer.LibraryEventsConsumer; import com.learnkafka.entity.FailureRecord; -import com.learnkafka.entity.LibraryEvent; import com.learnkafka.jpa.FailureRecordRepository; import com.learnkafka.service.LibraryEventsService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; //@Component @Slf4j @@ -49,7 +44,7 @@ public void retryFailedRecords(){ private ConsumerRecord buildConsumerRecord(FailureRecord failureRecord) { return new ConsumerRecord<>(failureRecord.getTopic(), - failureRecord.getPartition(), failureRecord.getOffset_value(), failureRecord.getKey(), + failureRecord.getPartition(), failureRecord.getOffset_value(), failureRecord.getKey_value(), failureRecord.getErrorRecord()); } diff --git a/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java b/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java index 2845286..5fb5f55 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java +++ b/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java @@ -75,16 +75,13 @@ public void handleRecovery(ConsumerRecord record){ Integer key = record.key(); String message = record.value(); - ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key, message); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, message, ex); - } + var listenableFuture = kafkaTemplate.sendDefault(key, message); + listenableFuture.whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, message, throwable); + } else { + handleSuccess(key, message, sendResult); - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, message, result); } }); } From d60311becdf697516d260d4c0d2cd902de41b458 Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Mon, 1 May 2023 14:46:36 -0500 Subject: [PATCH 09/15] added the working ssl code --- .../src/main/resources/application.yml | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/library-events-producer/src/main/resources/application.yml b/library-events-producer/src/main/resources/application.yml index 4789455..ae87ab1 100644 --- a/library-events-producer/src/main/resources/application.yml +++ b/library-events-producer/src/main/resources/application.yml @@ -21,6 +21,34 @@ spring: properties: bootstrap.servers: localhost:9092,localhost:9093,localhost:9094 --- +spring: + config: + activate: + on-profile: ssl + kafka: + template: + default-topic: library-events + producer: + bootstrap-servers: localhost:9092 + key-serializer: org.apache.kafka.common.serialization.IntegerSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + ssl: + trust-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/kafka.producer.truststore.jks + trust-store-password: confluent + key-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/kafka.producer.keystore.jks + key-store-password: confluent + properties: + acks: all + retries: 10 + retry.backoff.ms: 1000 + security: + protocol: SSL + ssl.endpoint.identification.algorithm: + + admin: + properties: + bootstrap.servers: localhost:9092 +--- spring: config: activate: @@ -33,9 +61,9 @@ spring: key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ssl: - trust-store-location: file:/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.truststore.jks + trust-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.truststore.jks trust-store-password: password - key-store-location: file:/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.keystore.jks + key-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.keystore.jks key-store-password: password properties: acks: all From 2ee2a37c06a95fa14e5456d4919b40efe7d0d734 Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Mon, 1 May 2023 14:46:53 -0500 Subject: [PATCH 10/15] added the kafka docker compose --- Undersrading-Kafka-DockerCompose.md | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 Undersrading-Kafka-DockerCompose.md diff --git a/Undersrading-Kafka-DockerCompose.md b/Undersrading-Kafka-DockerCompose.md new file mode 100644 index 0000000..3c399a7 --- /dev/null +++ b/Undersrading-Kafka-DockerCompose.md @@ -0,0 +1,32 @@ +# Understanding Kafka Docker Compose works + +- More info is available in this link - https://rmoff.net/2018/08/02/kafka-listeners-explained/ + +## Kafka broker docker-compose config: + +``` +kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 +``` +- KAFKA_INTER_BROKER_LISTENER_NAME + - Kafka brokers communicate between themselves, usually on the internal network. This is where the **KAFKA_INTER_BROKER_LISTENER_NAME** property comes in handy. +- KAFKA_ADVERTISED_LISTENERS + - The config that's present in this property is the data that's shared to the clients when they are connected. + - Kafka clients may not be in the network where the kafka broker is running. + - For a broker that's running in the docker network, the client is very much likely possible outside the docker network. From 45a392561d9c6a975f78e2c663c76a036bf908e4 Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Tue, 2 May 2023 08:43:16 -0500 Subject: [PATCH 11/15] configure the kafka docker with with min insync replica --- SetUpKafkaDocker.md | 72 ++++++++++++++++++++++++++++++++- docker-compose-multi-broker.yml | 10 ++--- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index 290c27e..97781fe 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -81,6 +81,75 @@ a-abc b-bus ``` +### Set up a Kafka Cluster with 3 brokers + +- Run this command and this will spin up a kafka cluster with 3 brokers. + +``` +docker-compose -f docker-compose-multi-broker.yml up +``` + +- Create topic with the replication factor as 3 + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 \ + --create \ + --topic test-topic \ + --replication-factor 3 --partitions 3 +``` + +- Produce Messages to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server localhost:9092,kafka2:19093,kafka3:19094 \ + --topic test-topic +``` + +- Consume Messages from the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server localhost:9092,kafka2:19093,kafka3:19094 \ + --topic test-topic \ + --from-beginning +``` +#### Log files in Multi Kafka Cluster + +- Log files will be created for each partition in each of the broker instance of the Kafka cluster. + - Login to the container **kafka1**. + ``` + docker exec -it kafka1 bash + ``` + - Login to the container **kafka2**. + ``` + docker exec -it kafka2 bash + ``` + +- Shutdown the kafka cluster + +``` +docker-compose -f docker-compose-multi-broker.yml down +``` + +### Setting up min.insync.replica + +- Topic - test-topic + +``` +docker exec --interactive --tty kafka1 \ +kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name test-topic \ +--alter --add-config min.insync.replicas=2 +``` + +- Topic - library-events + +``` +docker exec --interactive --tty kafka1 \ +kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name library-events \ +--alter --add-config min.insync.replicas=2 +``` ## Advanced Kafka Commands ### List the topics in a cluster @@ -100,13 +169,12 @@ docker exec --interactive --tty kafka1 \ kafka-topics --bootstrap-server kafka1:19092 --describe ``` -- Command to describe all the Kafka topics. +- Command to describe a specific Kafka topic. ``` docker exec --interactive --tty kafka1 \ kafka-topics --bootstrap-server kafka1:19092 --describe \ --topic test-topic - ``` ### Alter topic Partitions diff --git a/docker-compose-multi-broker.yml b/docker-compose-multi-broker.yml index b5e0b73..c581f27 100644 --- a/docker-compose-multi-broker.yml +++ b/docker-compose-multi-broker.yml @@ -3,6 +3,7 @@ version: '2.1' services: zoo1: image: confluentinc/cp-zookeeper:7.3.2 + platform: linux/amd64 hostname: zoo1 container_name: zoo1 ports: @@ -15,6 +16,7 @@ services: kafka1: image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 hostname: kafka1 container_name: kafka1 ports: @@ -27,13 +29,12 @@ services: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 kafka2: image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 hostname: kafka2 container_name: kafka2 ports: @@ -46,14 +47,13 @@ services: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 kafka3: image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 hostname: kafka3 container_name: kafka3 ports: @@ -66,7 +66,5 @@ services: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 From ed5a6365791e706b63cf75e4304e3830fd8dcd7a Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Wed, 3 May 2023 08:32:34 -0500 Subject: [PATCH 12/15] updated the command --- SetUpKafkaDocker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index 97781fe..bcb9bf3 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -205,7 +205,7 @@ kafka-consumer-groups --bootstrap-server kafka1:19092 \ - Log into the container. ``` -docker exec --interactive --tty kafka1 +docker exec --it kafka1 ``` - The config file is present in the below path. From e6358a6ed94d5e731d8d34a851b235bffa49a165 Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Wed, 3 May 2023 08:33:09 -0500 Subject: [PATCH 13/15] updated the command --- SetUpKafkaDocker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index bcb9bf3..c7a0a0b 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -205,7 +205,7 @@ kafka-consumer-groups --bootstrap-server kafka1:19092 \ - Log into the container. ``` -docker exec --it kafka1 +docker exec -it kafka1 ``` - The config file is present in the below path. From c4e3ff3b5f9050b9dea99c0da50873218117fb5a Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Wed, 3 May 2023 08:34:12 -0500 Subject: [PATCH 14/15] updated the command --- SetUpKafkaDocker.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md index c7a0a0b..bf18c2d 100644 --- a/SetUpKafkaDocker.md +++ b/SetUpKafkaDocker.md @@ -205,7 +205,7 @@ kafka-consumer-groups --bootstrap-server kafka1:19092 \ - Log into the container. ``` -docker exec -it kafka1 +docker exec -it kafka1 bash ``` - The config file is present in the below path. From ee5fae6e73b051abde1286a7121e81e1322af2d6 Mon Sep 17 00:00:00 2001 From: dilip sundarraj Date: Fri, 5 May 2023 15:58:01 -0500 Subject: [PATCH 15/15] fixed the tests --- .../controller/LibraryEventsController.java | 13 ++-- .../com/learnkafka/domain/BookRecord.java | 13 ++++ .../com/learnkafka/domain/LibraryEvent.java | 2 +- .../learnkafka/domain/LibraryEventRecord.java | 13 ++++ .../producer/LibraryEventProducer.java | 5 +- ...ibraryEventsControllerIntegrationTest.java | 57 ++++++--------- .../LibraryEventControllerUnitTest.java | 69 +++++-------------- .../LibraryEventProducerUnitTest.java | 33 ++------- .../unit/com/learnkafka/util/TestUtil.java | 62 +++++++++++++++++ 9 files changed, 139 insertions(+), 128 deletions(-) create mode 100644 library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java create mode 100644 library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java create mode 100644 library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java diff --git a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java index 93136ed..75f4e71 100644 --- a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java +++ b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java @@ -1,8 +1,7 @@ package com.learnkafka.controller; import com.fasterxml.jackson.core.JsonProcessingException; -import com.learnkafka.domain.LibraryEvent; -import com.learnkafka.domain.LibraryEventType; +import com.learnkafka.domain.LibraryEventRecord; import com.learnkafka.producer.LibraryEventProducer; import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; @@ -14,8 +13,6 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import java.util.concurrent.ExecutionException; - @RestController @Slf4j public class LibraryEventsController { @@ -24,10 +21,9 @@ public class LibraryEventsController { LibraryEventProducer libraryEventProducer; @PostMapping("/v1/libraryevent") - public ResponseEntity postLibraryEvent(@RequestBody @Valid LibraryEvent libraryEvent) throws JsonProcessingException, ExecutionException, InterruptedException { + public ResponseEntity postLibraryEvent(@RequestBody @Valid LibraryEventRecord libraryEvent) throws JsonProcessingException { //invoke kafka producer - libraryEvent.setLibraryEventType(LibraryEventType.NEW); libraryEventProducer.sendLibraryEvent_Approach2(libraryEvent); //libraryEventProducer.sendLibraryEvent(libraryEvent); return ResponseEntity.status(HttpStatus.CREATED).body(libraryEvent); @@ -35,14 +31,13 @@ public ResponseEntity postLibraryEvent(@RequestBody @Valid Library //PUT @PutMapping("/v1/libraryevent") - public ResponseEntity putLibraryEvent(@RequestBody @Valid LibraryEvent libraryEvent) throws JsonProcessingException, ExecutionException, InterruptedException { + public ResponseEntity putLibraryEvent(@RequestBody @Valid LibraryEventRecord libraryEvent) throws JsonProcessingException { log.info("LibraryEvent : {} ",libraryEvent ); - if(libraryEvent.getLibraryEventId()==null){ + if(libraryEvent.libraryEventId()==null){ return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Please pass the LibraryEventId"); } - libraryEvent.setLibraryEventType(LibraryEventType.UPDATE); libraryEventProducer.sendLibraryEvent_Approach2(libraryEvent); return ResponseEntity.status(HttpStatus.OK).body(libraryEvent); } diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java b/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java new file mode 100644 index 0000000..53892d1 --- /dev/null +++ b/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java @@ -0,0 +1,13 @@ +package com.learnkafka.domain; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; + +public record BookRecord( + @NotNull + Integer bookId, + @NotBlank + String bookName, + @NotBlank + String bookAuthor) { +} diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java index 25f7c25..2d97de6 100644 --- a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java +++ b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java @@ -19,6 +19,6 @@ public class LibraryEvent { private LibraryEventType libraryEventType; @NotNull @Valid - private Book book; + private BookRecord book; } diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java new file mode 100644 index 0000000..bdf49fe --- /dev/null +++ b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java @@ -0,0 +1,13 @@ +package com.learnkafka.domain; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +public record LibraryEventRecord( + Integer libraryEventId, + LibraryEventType libraryEventType, + @NotNull + @Valid + BookRecord book +) { +} diff --git a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java index 96ba37e..a741722 100644 --- a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java +++ b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; @@ -46,9 +47,9 @@ public CompletableFuture> sendLibraryEvent(LibraryEv }); } - public CompletableFuture> sendLibraryEvent_Approach2(LibraryEvent libraryEvent) throws JsonProcessingException { + public CompletableFuture> sendLibraryEvent_Approach2(LibraryEventRecord libraryEvent) throws JsonProcessingException { - Integer key = libraryEvent.getLibraryEventId(); + Integer key = libraryEvent.libraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); ProducerRecord producerRecord = buildProducerRecord(key, value, topic); diff --git a/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java b/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java index a305b4b..8d1d869 100644 --- a/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java +++ b/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java @@ -1,10 +1,10 @@ package com.learnkafka.controller; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.util.TestUtil; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -39,6 +39,9 @@ public class LibraryEventsControllerIntegrationTest { @Autowired EmbeddedKafkaBroker embeddedKafkaBroker; + @Autowired + ObjectMapper objectMapper; + private Consumer consumer; @BeforeEach @@ -58,35 +61,25 @@ void tearDown() { @Timeout(5) void postLibraryEvent() throws InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); HttpHeaders headers = new HttpHeaders(); headers.set("content-type", MediaType.APPLICATION_JSON.toString()); - HttpEntity request = new HttpEntity<>(libraryEvent, headers); + HttpEntity request = new HttpEntity<>(libraryEventRecord, headers); //when - ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.POST, request, LibraryEvent.class); + ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.POST, request, LibraryEventRecord.class); //then assertEquals(HttpStatus.CREATED, responseEntity.getStatusCode()); - ConsumerRecords consumerRecords = KafkaTestUtils.getRecords(consumer); //Thread.sleep(3000); assert consumerRecords.count() == 1; - consumerRecords.forEach(record-> { - String expectedRecord = "{\"libraryEventId\":null,\"libraryEventType\":\"NEW\",\"book\":{\"bookId\":123,\"bookName\":\"Kafka using Spring Boot\",\"bookAuthor\":\"Dilip\"}}"; - String value = record.value(); - assertEquals(expectedRecord, value); + consumerRecords.forEach(record -> { + var libraryEventActual = TestUtil.parseLibraryEventRecord(objectMapper, record.value()); + assertEquals(libraryEventRecord, libraryEventActual); + }); @@ -96,23 +89,14 @@ void postLibraryEvent() throws InterruptedException { @Timeout(5) void putLibraryEvent() throws InterruptedException { //given - Book book = Book.builder() - .bookId(456) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(123) - .book(book) - .build(); + var libraryEventUpdate = TestUtil.libraryEventRecordUpdate(); HttpHeaders headers = new HttpHeaders(); headers.set("content-type", MediaType.APPLICATION_JSON.toString()); - HttpEntity request = new HttpEntity<>(libraryEvent, headers); + HttpEntity request = new HttpEntity<>(libraryEventUpdate, headers); //when - ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.PUT, request, LibraryEvent.class); + ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.PUT, request, LibraryEventRecord.class); //then assertEquals(HttpStatus.OK, responseEntity.getStatusCode()); @@ -121,11 +105,10 @@ void putLibraryEvent() throws InterruptedException { ConsumerRecords consumerRecords = KafkaTestUtils.getRecords(consumer); //Thread.sleep(3000); assert consumerRecords.count() == 2; - consumerRecords.forEach(record-> { - if(record.key()!=null){ - String expectedRecord = "{\"libraryEventId\":123,\"libraryEventType\":\"UPDATE\",\"book\":{\"bookId\":456,\"bookName\":\"Kafka using Spring Boot\",\"bookAuthor\":\"Dilip\"}}"; - String value = record.value(); - assertEquals(expectedRecord, value); + consumerRecords.forEach(record -> { + if (record.key() != null) { + var libraryEventActual = TestUtil.parseLibraryEventRecord(objectMapper, record.value()); + assertEquals(libraryEventUpdate, libraryEventActual); } }); diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java b/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java index 3cb0a26..4390d92 100644 --- a/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java +++ b/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java @@ -1,10 +1,9 @@ package com.learnkafka.controller; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; import com.learnkafka.producer.LibraryEventProducer; +import com.learnkafka.util.TestUtil; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; @@ -13,9 +12,7 @@ import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; @@ -37,19 +34,11 @@ public class LibraryEventControllerUnitTest { @Test void postLibraryEvent() throws Exception { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + String json = objectMapper.writeValueAsString(libraryEventRecord); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform(post("/v1/libraryevent") @@ -63,21 +52,12 @@ void postLibraryEvent() throws Exception { void postLibraryEvent_4xx() throws Exception { //given - Book book = Book.builder() - .bookId(null) - .bookAuthor(null) - .bookName("Kafka using Spring Boot") - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecordWithInvalidBook(); - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + String json = objectMapper.writeValueAsString(libraryEventRecord); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect - String expectedErrorMessage = "book.bookAuthor - must not be blank, book.bookId - must not be null"; + String expectedErrorMessage = "book.bookId - must not be null, book.bookName - must not be blank"; mockMvc.perform(post("/v1/libraryevent") .content(json) .contentType(MediaType.APPLICATION_JSON)) @@ -90,18 +70,10 @@ void postLibraryEvent_4xx() throws Exception { void updateLibraryEvent() throws Exception { //given - Book book = new Book().builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka Using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(123) - .book(book) - .build(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + + + String json = objectMapper.writeValueAsString(TestUtil.libraryEventRecordUpdate()); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform( @@ -116,18 +88,9 @@ void updateLibraryEvent() throws Exception { void updateLibraryEvent_withNullLibraryEventId() throws Exception { //given - Book book = new Book().builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka Using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + + String json = objectMapper.writeValueAsString(TestUtil.libraryEventRecordUpdateWithNullLibraryEventId()); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform( diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java index a0c2a01..18ea71d 100644 --- a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java +++ b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java @@ -2,8 +2,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.util.TestUtil; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -38,41 +38,22 @@ public class LibraryEventProducerUnitTest { @Test void sendLibraryEvent_Approach2_failure() throws JsonProcessingException, ExecutionException, InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(CompletableFuture.supplyAsync(()-> new RuntimeException("Exception Calling Kafka"))); //when - assertThrows(Exception.class, ()->eventProducer.sendLibraryEvent_Approach2(libraryEvent).get()); + assertThrows(Exception.class, ()->eventProducer.sendLibraryEvent_Approach2(TestUtil.libraryEventRecord()).get()); } @Test void sendLibraryEvent_Approach2_success() throws JsonProcessingException, ExecutionException, InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - String record = objectMapper.writeValueAsString(libraryEvent); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); + String record = objectMapper.writeValueAsString(libraryEventRecord); - ProducerRecord producerRecord = new ProducerRecord("library-events", libraryEvent.getLibraryEventId(),record ); + ProducerRecord producerRecord = new ProducerRecord("library-events", libraryEventRecord.libraryEventId(),record ); RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("library-events", 1), 1,1,System.currentTimeMillis(), 1, 2); SendResult sendResult = new SendResult(producerRecord,recordMetadata); @@ -81,7 +62,7 @@ void sendLibraryEvent_Approach2_success() throws JsonProcessingException, Execut when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(future); //when - var completableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEvent); + var completableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEventRecord); //then SendResult sendResult1 = completableFuture.get(); diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java b/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java new file mode 100644 index 0000000..d35a4cd --- /dev/null +++ b/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java @@ -0,0 +1,62 @@ +package com.learnkafka.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.learnkafka.domain.BookRecord; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.domain.LibraryEventType; + +public class TestUtil { + + public static BookRecord bookRecord(){ + + return new BookRecord(123, "Dilip","Kafka Using Spring Boot" ); + } + + public static BookRecord bookRecordWithInvalidValues(){ + + return new BookRecord(null, "","Kafka Using Spring Boot" ); + } + + public static LibraryEventRecord libraryEventRecord(){ + + return + new LibraryEventRecord(null, + LibraryEventType.NEW, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordUpdate(){ + + return + new LibraryEventRecord(123, + LibraryEventType.UPDATE, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordUpdateWithNullLibraryEventId(){ + + return + new LibraryEventRecord(null, + LibraryEventType.UPDATE, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordWithInvalidBook(){ + + return + new LibraryEventRecord(null, + LibraryEventType.NEW, + bookRecordWithInvalidValues()); + } + + public static LibraryEventRecord parseLibraryEventRecord(ObjectMapper objectMapper , String json){ + + try { + return objectMapper.readValue(json, LibraryEventRecord.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } +}