Skip to content

Commit 9b30d3b

Browse files
dvaseekarayasiribmcondongjinleekrakatona84wonjong-yoo
authored
[CORE-123098] - Upgrade to Kafka 3.9.0 (#17)
* Upgrade simplekdc to 2.1.0 (linkedin#2186) This PR resolves linkedin#2178 Upgrading simplekdc version to "2.1.0" which supports a change that can correctly use security classes based on what version of IBM Semeru JDK(if applicable) is being used. There is no regression observed using Semeru, OpenJDK and Temurin JDKs. This newer version(released on 14 August 2024) also caters vulnerability in deps mentioned linkedin#2179 as **org.jboss.xnio:xnio-api** is updated to **3.8.16**[^1] [^1]:https://github.com/apache/directory-kerby/releases/tag/kerby-all-2.1.0#:~:text=Bump%20org.jboss.xnio%3Axnio%2Dapi%20from%203.8.15.Final%20to%203.8.16.Final). * remove unused KafkaSampleStore#_skipSampleStoreTopicRackAwarenessCheck (linkedin#2183) left over from linkedin#1572 (6ae3f41) * Test logging fix, by default log4j2 looks for log4j2.properties file (linkedin#2181) `log4j.properties` files are ignored in the test resources, after renamed, finally I was able to change the loglevels while unit/integration testing. I'm not sure if it was the issue on issue linkedin#2152, but this would be the fix for tests. Prod should work with the log4j.properties file as that is passed with -Dlog4j.configurationFile java opt * fix typo in comment (linkedin#2189) Fix 'the the' in the comments * new PR template (linkedin#2191) ## Summary Why: Improve PR quality and review-ability. What: modifies current PR template to be structured and require more details when submitting PRs. ## Expected Behavior PR must come with sufficient details to address or explain the issue. ## Actual Behavior PR template only requires link to the issue: ``` This PR resolves #<Replace-Me-With-The-Issue-Number-Addressed-By-This-PR>. ``` ## Steps to reproduce 1. either create a new PR or 2. see [the current template](https://github.com/linkedin/cruise-control/blob/c5545ef04618b5b42290edda2ee63eb6bfa2e1a6/docs/pull_request_template.md) ## Known Workarounds People voluntarily provide additional details ## Additional Evidence - n/a ## Categorization - [x] refactor * CI workflow with Github Actions (linkedin#2192) ## Summary ### Why 1. GIthub Actions workflow are native GH workflows 2. Github Actions do not require additional non-github accounts unlike CircleCI 3. plenty of compute resources[^0] available for OSS projects 4. unlike CircleCI resource limits (don't have details) [^0]:https://docs.github.com/en/actions/administering-github-actions/usage-limits-billing-and-administration#availability ### What 1. creates CI workflow `ci.yaml` 2. creates Artifactory workflow: `artifactory.yaml` Workflow structure is documented in the spec[^1] [^1]:https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions ## Expected Behavior CI is expected to 1. execute unit tests 1. execute integration tests 1. execute hw platform unit tests 1. publish artifacts to the artifactory when a tag is published 1. provide ability to re-run tests on failures 1. report results to corresponding PR/branch which is to be used as quality gates for PR merging. ## Actual Behavior 1. current Circle CI integration provides [1] [2] [3] [4] from the expected behavior 4. but re-run-ing checks requires additional efforts like logging in into the Circle CI 5. which slows PR feedback loop as users may not have CircleCI credentials and knowledge of the system [1]:https://github.com/linkedin/cruise-control/blob/a298df86095532264f13ca7490cfabb8ff68839f/.circleci/config.yml#L51-L53 [2]:https://github.com/linkedin/cruise-control/blob/a298df86095532264f13ca7490cfabb8ff68839f/.circleci/config.yml#L51-L53 [3]:https://github.com/linkedin/cruise-control/blob/a298df86095532264f13ca7490cfabb8ff68839f/.circleci/config.yml#L5-L34 [4]:https://github.com/linkedin/cruise-control/blob/a298df86095532264f13ca7490cfabb8ff68839f/.circleci/config.yml#L94-L103 ## Steps to reproduce 1. see failed PR checks, ie linkedin#2133 ## Known Workarounds 1. asking PR authors to trigger build ## Migration Plan 1. add GH Actions integration along with CircleCI 2. confirm GH Actions provide equivalent or better functionality 3. remove CircleCI integration 4. ensure publishing via GH actions works ## Categorization - [x] refactor * Update README.md * Set Embedded Zookeeper listen on 127.0.0.1 (linkedin#2196) ## Summary 1. Why: when on VPN, I can't run Cruise Control tests as ZK is binding to local real ip address and local network is restricted. 2. What: changing to bind to 127.0.0.1 fixes it (got the idea from Kafka embedded ZK setup. I think it won't make any difference how automation or human would run the tests, pls correct me if I'm wrong. * Add "documentation" category to PR template (linkedin#2195) ## Summary 1. Why: to categorize documentation PRs 2. What: adds "documentation" category to the PR template ## Expected Behavior - when users make documentation changes - they should be able to specify documentation as a change category ## Actual Behavior - no documentation category to specify * Add missing documentation for minNumBrokersViolateMetricLimit ## Summary 1. Why: Documentation for **min.num.brokers.violate.metric.limit.to.decrease.cluster.concurrency** is missing. 2. What: document the setting * Add more logging to help debugging the time spent on goal based operation (linkedin#2202) * Add more logging to help debugging the time spent on goal based operation * Update cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/async/progress/OperationProgress.java Co-authored-by: Maryan Hratson <[email protected]> * Update cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java Co-authored-by: Maryan Hratson <[email protected]> * Update cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java Co-authored-by: Maryan Hratson <[email protected]> --------- Co-authored-by: Maryan Hratson <[email protected]> * Fix the Circle CI error by removing the test-multi-arch job (linkedin#2203) * Remove test-multi-arch job in Circle CI * Remove test-multi-arch definitions * Improve per task observability through additional logging (linkedin#2204) * Fix the issue that uuid is null in the log after execution * Add more logging to track the task with its UUID * Rename "task" to "User task" in logging * Reformat logging * Fixing Unexpected method calls: HttpSession.invalidate (linkedin#2201) ## Summary 1. Why: The test failed sometimes with unexpected method calls. 2. What: The fix is preparing the test to accept invalidate method call too ## Expected Behavior Tests are running without failure ## Actual Behavior Tests are failing sometimes with unexpected method call. ## Steps to Reproduce 1. setup repeated run on e.g. `testCreateUserTask` in IDE 2. observe failure after multiple successful runs (for me it was failing after around 250 successful runs) ## Additional evidence ``` java.lang.AssertionError: On mock #2 (zero indexed): Unexpected method calls: HttpSession.invalidate() at org.easymock.EasyMock.getAssertionError(EasyMock.java:2230) at org.easymock.EasyMock.verify(EasyMock.java:2058) at com.linkedin.kafka.cruisecontrol.servlet.UserTaskManagerTest.testCreateUserTask(UserTaskManagerTest.java:59) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy5.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) ``` ## Categorization - [x] bugfix - [ ] new feature - [ ] refactor - [ ] CVE - [ ] other * Update the README (linkedin#2216) This is a minor improvement to the README.md. * fix: Fix CVEs (linkedin#2220) Update dependencies to fix CVEs: Zookeeper, Netty, Jetty, Nimbus JOSE+JWT * Fix: intra.broker.goals cannot be configured as default.goals (linkedin#2221) * Kerberos auth to local rules support (linkedin#2043) * Expose AdminClient exception when failing to describe the cluster (linkedin#2222) * Fix PartitionSizeAnomalyFinder, to be able to handle custom SELF_HEALING_PARTITION_SIZE_THRESHOLD_MB values (linkedin#2212) * Upgrade Kafka to 3.8.0 (linkedin#2180) * Upgrading kafka to 3.8.0 - config properties rewriting and adding necessary dependencies # Conflicts: # gradle.properties * Upgrading kafka to 3.8.0 - using alternative for removed getAllTopicConfigs zk admin client method * Upgrading kafka to 3.8.0 - adding 3.8 zk client creation way * Upgrading kafka to 3.8.0 - adding 3.8 network client creation way * replication/quota/topic log constants moved in 3.8 again its value hasn't changed, only where it was stored, this way it's backward compatible * Update usages of Metadata to conform to kafka 3.7 interface --------- Co-authored-by: David Simon <[email protected]> * Rectify docker run command for s390x (linkedin#2249) * Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler (linkedin#2211) * Update license to reflect the latest status (linkedin#2256) * Catch NoSuchFileException on load failed brokers list (linkedin#2255) * Replace deprecated methods to support Kafka 4.0.0 (linkedin#2254) * Upgrade Kafka to 3.8.0 (linkedin#2180) * Upgrading kafka to 3.8.0 - config properties rewriting and adding necessary dependencies * Upgrading kafka to 3.8.0 - using alternative for removed getAllTopicConfigs zk admin client method * Upgrading kafka to 3.8.0 - adding 3.8 zk client creation way * Upgrading kafka to 3.8.0 - adding 3.8 network client creation way * replication/quota/topic log constants moved in 3.8 again its value hasn't changed, only where it was stored, this way it's backward compatible * Update usages of Metadata to conform to kafka 3.7 interface --------- Co-authored-by: David Simon <[email protected]> * Use literal config name for listeners and broker.id config (linkedin#2169) * Disabling test and integration-test in github-workflows --------- Co-authored-by: yasiribmcon <[email protected]> Co-authored-by: Lee Dongjin <[email protected]> Co-authored-by: Andras Katona <[email protected]> Co-authored-by: wonjong-yoo <[email protected]> Co-authored-by: Maryan Hratson <[email protected]> Co-authored-by: ik <[email protected]> Co-authored-by: Allen Wang <[email protected]> Co-authored-by: Hao Geng <[email protected]> Co-authored-by: Aswin A <[email protected]> Co-authored-by: Kondrat Bertalan <[email protected]> Co-authored-by: Tamas Barnabas Egyed <[email protected]> Co-authored-by: harmadasg <[email protected]> Co-authored-by: David Simon <[email protected]> Co-authored-by: Paolo Patierno <[email protected]> Co-authored-by: Shubham Rawat <[email protected]> Co-authored-by: Henry Haiying Cai <[email protected]> Co-authored-by: Daniel Vaseekaran <[email protected]>
1 parent 0379c8f commit 9b30d3b

File tree

70 files changed

+1695
-471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1695
-471
lines changed

.circleci/config.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,11 @@ workflows:
7777
ignore: /.*/
7878
tags:
7979
only: /^[0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9_]+)?$/
80+
- publish:
81+
requires:
82+
- integration-test
83+
filters:
84+
branches:
85+
ignore: /.*/
86+
tags:
87+
only: /^[0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9_]+)?$/

.github/workflows/artifactory.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: Artifactory
2+
3+
on:
4+
workflow_dispatch: # manual trigger
5+
#release:
6+
# types: [published]
7+
8+
jobs:
9+
publish:
10+
# if: startsWith(github.event.ref, 'release/')
11+
name: publish
12+
runs-on: ubuntu-latest
13+
steps:
14+
- uses: actions/checkout@v4
15+
with:
16+
fetch-depth: 0 # so gradle doesn't fail traversing the history
17+
- uses: actions/setup-java@v4
18+
with:
19+
java-version: 11
20+
distribution: microsoft
21+
cache: gradle
22+
- uses: gradle/actions/setup-gradle@v4 # v4.0.0
23+
- name: publish
24+
run: |
25+
./gradlew :artifactoryPublish :cruise-control:artifactoryPublish :cruise-control-core:artifactoryPublish :cruise-control-metrics-reporter:artifactoryPublish

.github/workflows/ci.yaml

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: ['main']
6+
pull_request:
7+
types: [ opened, synchronize, reopened ]
8+
9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
11+
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
12+
13+
jobs:
14+
test:
15+
if: false ## update to true to run this workflow
16+
name: "test with JDK=${{matrix.java-dist}}:${{matrix.java-ver}}"
17+
runs-on: [ubuntu-latest]
18+
strategy:
19+
fail-fast: false
20+
matrix:
21+
java-ver: [11]
22+
java-dist: ['microsoft', 'temurin']
23+
steps:
24+
- uses: actions/checkout@v4
25+
with:
26+
fetch-depth: 0 # so gradle doesn't fail traversing the history
27+
- uses: actions/setup-java@v4
28+
with:
29+
java-version: ${{ matrix.java-ver }}
30+
distribution: ${{ matrix.java-dist }}
31+
cache: gradle
32+
# see: https://github.com/gradle/actions/blob/main/setup-gradle/README.md
33+
- uses: gradle/actions/setup-gradle@v4 # v4.0.0
34+
- name: gradle build
35+
run: ./gradlew --no-daemon -PmaxParallelForks=1 build
36+
37+
integration-test:
38+
if: false ## update to true to run this workflow
39+
name: "integration-test with JDK=${{matrix.java-dist}}:${{matrix.java-ver}}"
40+
runs-on: [ubuntu-latest]
41+
strategy:
42+
fail-fast: false
43+
matrix:
44+
java-ver: [11]
45+
java-dist: ['microsoft', 'temurin']
46+
steps:
47+
- uses: actions/checkout@v4
48+
with:
49+
fetch-depth: 0 # so gradle doesn't fail traversing the history
50+
- uses: actions/setup-java@v4
51+
with:
52+
java-version: ${{ matrix.java-ver }}
53+
distribution: ${{ matrix.java-dist }}
54+
cache: gradle
55+
# see: https://github.com/gradle/actions/blob/main/setup-gradle/README.md
56+
- uses: gradle/actions/setup-gradle@v4 # v4.0.0
57+
- name: gradle integration test
58+
run: ./gradlew --no-daemon -PmaxParallelForks=1 clean integrationTest
59+
60+
build-platform:
61+
if: false ## update to true to run this workflow
62+
name: platform build with JDK=${{matrix.java-dist}}:${{matrix.java-ver}} on ${{matrix.hw_platform}}
63+
strategy:
64+
fail-fast: false
65+
matrix:
66+
java-ver: [11]
67+
java-dist: ['temurin']
68+
hw_platform: ['s390x']
69+
runs-on: ubuntu-latest
70+
steps:
71+
- uses: actions/checkout@v4
72+
with:
73+
fetch-depth: 0 # so gradle doesn't fail traversing the history
74+
- continue-on-error: true
75+
run: |
76+
# install required qemu libraries
77+
docker run --rm --privileged tonistiigi/binfmt:latest --install all
78+
# run docker container with qemu emulation
79+
docker run --rm \
80+
--platform ${{ matrix.hw_platform }} \
81+
--name qemu-cross-${{ matrix.hw_platform }} \
82+
--mount type=bind,source=${PWD},target=/workspace \
83+
--workdir /workspace \
84+
${{matrix.hw_platform}}/eclipse-temurin:11-jdk /bin/sh -c "uname -a; ./gradlew --no-daemon -PmaxParallelForks=1 build"

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
BSD 2-CLAUSE LICENSE
22

3-
Copyright 2017, 2018, 2019, 2020, 2021, 2022 LinkedIn Corporation.
3+
Copyright 2017-2024 LinkedIn Corporation.
44
All Rights Reserved.
55

66
Redistribution and use in source and binary forms, with or without

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ Cruise Control for Apache Kafka
55
[![Github Actions](https://github.com/adobe/cruise-control/workflows/build-publish-docker-image/badge.svg)](https://github.com/adobe/cruise-control/workflows/build-publish-docker-image/badge.svg)
66
[![Docker Hub](https://img.shields.io/docker/v/adobe/cruise-control?sort=date&style=plastic)](https://hub.docker.com/r/adobe/cruise-control/tags)
77

8+
89
### Introduction ###
910
Cruise Control is a product that helps run Apache Kafka clusters at large scale. Due to the popularity of
10-
Apache Kafka, many companies have bigger and bigger Kafka clusters. At LinkedIn, we have ~7K+ Kafka brokers,
11+
Apache Kafka, many companies have increasingly large Kafka clusters with hundreds of brokers. At LinkedIn, we have 10K+ Kafka brokers,
1112
which means broker deaths are an almost daily occurrence and balancing the workload of Kafka also becomes a big overhead.
1213

13-
Kafka Cruise Control is designed to address this operation scalability issue.
14+
Kafka Cruise Control is designed to address this operational scalability issue.
1415

1516
### Features ###
1617
Kafka Cruise Control provides the following features out of the box:

build.gradle

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ project(':cruise-control') {
282282
implementation "io.netty:netty-handler:${nettyVersion}"
283283
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
284284
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
285+
api "org.apache.kafka:kafka-server:$kafkaVersion"
286+
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
285287
api "org.apache.kafka:kafka-clients:$kafkaVersion"
286288
// Add following dependency when upgrading to Kafka 3.5
287289
api "org.apache.kafka:kafka-storage:$kafkaVersion"
@@ -292,7 +294,7 @@ project(':cruise-control') {
292294
implementation 'com.google.code.gson:gson:2.10.1'
293295
implementation "org.eclipse.jetty:jetty-server:${jettyVersion}"
294296
implementation 'io.dropwizard.metrics:metrics-jmx:4.2.18'
295-
implementation 'com.nimbusds:nimbus-jose-jwt:9.31'
297+
implementation 'com.nimbusds:nimbus-jose-jwt:9.45'
296298
implementation 'io.swagger.parser.v3:swagger-parser-v3:2.1.16'
297299
implementation 'io.github.classgraph:classgraph:4.8.158'
298300
implementation 'com.google.code.findbugs:jsr305:3.0.2'
@@ -320,7 +322,7 @@ project(':cruise-control') {
320322
testImplementation 'commons-io:commons-io:2.15.1'
321323
testImplementation 'org.apache.httpcomponents:httpclient:4.5.14:tests'
322324
testImplementation 'org.bouncycastle:bcpkix-jdk15on:1.70'
323-
testImplementation 'org.apache.kerby:kerb-simplekdc:2.0.3'
325+
testImplementation 'org.apache.kerby:kerb-simplekdc:2.1.0'
324326
testImplementation 'com.jayway.jsonpath:json-path:2.8.0'
325327
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
326328
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
@@ -454,6 +456,7 @@ project(':cruise-control-metrics-reporter') {
454456
implementation "org.apache.logging.log4j:log4j-slf4j2-impl:2.22.1"
455457
implementation "org.apache.logging.log4j:log4j-core:2.22.1"
456458
implementation "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
459+
implementation "org.apache.kafka:kafka-server:$kafkaVersion"
457460
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
458461
implementation 'com.google.code.findbugs:jsr305:3.0.2'
459462
implementation 'com.google.code.findbugs:annotations:3.0.1'
@@ -465,7 +468,9 @@ project(':cruise-control-metrics-reporter') {
465468
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
466469
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
467470
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
468-
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
471+
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
472+
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
473+
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
469474
testImplementation 'commons-io:commons-io:2.15.1'
470475
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
471476
testOutput sourceSets.test.output

cruise-control-core/src/main/java/com/linkedin/cruisecontrol/common/Generationed.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface Generationed<G> {
2727
* have a different comparing method along with the generation comparison.
2828
*
2929
* @param other another generationed object to compare the generation with.
30-
* @return -1 if the the generation of this object is earlier than the other's. 0 when the generations are the same.
30+
* @return -1 if the generation of this object is earlier than the other's. 0 when the generations are the same.
3131
* 1 when the generation of this object is later than the other's.
3232
*/
3333
int compareGeneration(Generationed<G> other);
@@ -36,7 +36,7 @@ public interface Generationed<G> {
3636
* Compare the generation of this object with the given generation.
3737
*
3838
* @param generation the given generation.
39-
* @return -1 if the the generation of this object is earlier than the given generation.
39+
* @return -1 if the generation of this object is earlier than the given generation.
4040
* 0 when the generations are the same.
4141
* 1 when the generation of this object is later than the given one.
4242
*/

cruise-control-core/src/main/java/com/linkedin/cruisecontrol/monitor/sampling/aggregator/MetricSampleAggregatorState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ synchronized void resetWindowIndices(long startingWindowIndex, int numWindowIndi
124124

125125
/**
126126
* Get the list of window indices that need to be updated based on the current generation.
127-
* This method also removes the windows that are older than the oldestWindowIndex from the the internal state
127+
* This method also removes the windows that are older than the oldestWindowIndex from the internal state
128128
* of this class.
129129
*
130130
* @param oldestWindowIndex the index of the oldest window in the MetricSampleAggregator.

cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.CruiseControlMetricsReporterException;
8+
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
89
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
910
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricsUtils;
1011
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
@@ -33,6 +34,7 @@
3334
import org.apache.kafka.clients.admin.Config;
3435
import org.apache.kafka.clients.admin.CreateTopicsResult;
3536
import org.apache.kafka.clients.admin.DescribeConfigsResult;
37+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3638
import org.apache.kafka.clients.admin.NewPartitions;
3739
import org.apache.kafka.clients.admin.NewTopic;
3840
import org.apache.kafka.clients.admin.TopicDescription;
@@ -42,6 +44,7 @@
4244
import org.apache.kafka.clients.producer.ProducerRecord;
4345
import org.apache.kafka.clients.producer.RecordMetadata;
4446
import org.apache.kafka.common.KafkaException;
47+
import org.apache.kafka.common.KafkaFuture;
4548
import org.apache.kafka.common.config.ConfigException;
4649
import org.apache.kafka.common.config.ConfigResource;
4750
import org.apache.kafka.common.config.TopicConfig;
@@ -370,16 +373,15 @@ protected void maybeUpdateTopicConfig() {
370373

371374
protected void maybeIncreaseTopicPartitionCount() {
372375
String cruiseControlMetricsTopic = _metricsTopic.name();
376+
373377
try {
374-
// Retrieve topic partition count to check and update.
375-
TopicDescription topicDescription =
376-
_adminClient.describeTopics(Collections.singletonList(cruiseControlMetricsTopic)).values()
377-
.get(cruiseControlMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
378+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
379+
TopicDescription topicDescription = getTopicDescription(_adminClient, cruiseControlMetricsTopic);
380+
378381
if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
379-
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic,
380-
NewPartitions.increaseTo(_metricsTopic.numPartitions())));
382+
_adminClient.createPartitions(Collections.singletonMap(cruiseControlMetricsTopic, NewPartitions.increaseTo(_metricsTopic.numPartitions())));
381383
}
382-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
384+
} catch (KafkaTopicDescriptionException e) {
383385
LOG.warn("Partition count increase to {} for topic {} failed{}.", _metricsTopic.numPartitions(), cruiseControlMetricsTopic,
384386
(e.getCause() instanceof ReassignmentInProgressException) ? " due to ongoing reassignment" : "", e);
385387
}
@@ -511,4 +513,70 @@ private void setIfAbsent(Properties props, String key, String value) {
511513
}
512514
}
513515

516+
/**
517+
* Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
518+
* This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 3.1.0 and later.
519+
* If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.9.0 and earlier.
520+
*
521+
* If neither of these methods is found, a {@link RuntimeException} is thrown.
522+
*
523+
* <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
524+
*
525+
* @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
526+
* @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
527+
*/
528+
/* test */ static Method topicNameValuesMethod() {
529+
//
530+
Method topicDescriptionMethod = null;
531+
try {
532+
// First we try to get the topicNameValues() method
533+
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("topicNameValues");
534+
} catch (NoSuchMethodException exception) {
535+
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
536+
}
537+
538+
if (topicDescriptionMethod == null) {
539+
try {
540+
// Second we try to get the values() method
541+
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("values");
542+
} catch (NoSuchMethodException exception) {
543+
LOG.info("Failed to get method values() from DescribeTopicsResult class: ", exception);
544+
}
545+
}
546+
547+
if (topicDescriptionMethod != null) {
548+
return topicDescriptionMethod;
549+
} else {
550+
throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
551+
}
552+
}
553+
554+
/**
555+
* Retrieves the {@link TopicDescription} for the specified Kafka topic, handling compatibility
556+
* with Kafka versions 4.0 and above. This method uses reflection to invoke the appropriate method
557+
* for retrieving topic description information, depending on the Kafka version.
558+
*
559+
* @param adminClient The Kafka {@link AdminClient} used to interact with the Kafka cluster.
560+
* @param ccMetricsTopic The name of the Kafka topic for which the description is to be retrieved.
561+
*
562+
* @return The {@link TopicDescription} for the specified Kafka topic.
563+
*
564+
* @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description,
565+
* or if the topic name retrieval method cannot be found or invoked properly. This includes
566+
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
567+
* execution exceptions, timeouts, and interruptions.
568+
*/
569+
/* test */ static TopicDescription getTopicDescription(AdminClient adminClient, String ccMetricsTopic) throws KafkaTopicDescriptionException {
570+
try {
571+
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
572+
Method topicDescriptionMethod = topicNameValuesMethod();
573+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(ccMetricsTopic));
574+
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
575+
.invoke(describeTopicsResult);
576+
return topicDescriptionMap.get(ccMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
577+
} catch (InvocationTargetException | IllegalAccessException | ExecutionException | InterruptedException | TimeoutException e) {
578+
throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic {}.",
579+
ccMetricsTopic), e);
580+
}
581+
}
514582
}

cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,11 @@ public static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter,
163163
* @param scaleMs the scale for computing the delay
164164
* @param base the base for computing the delay
165165
* @param maxAttempts the max number of attempts on calling the function
166+
* @param maxSleepMs the maximum sleep time between retries
166167
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
167168
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
168169
*/
169-
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
170+
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts, int maxSleepMs) {
170171
if (maxAttempts > 0) {
171172
int attempts = 0;
172173
long timeToSleep = scaleMs;
@@ -179,6 +180,9 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
179180
return false;
180181
}
181182
timeToSleep *= base;
183+
if (maxSleepMs > 0 && timeToSleep > maxSleepMs) {
184+
timeToSleep = maxSleepMs;
185+
}
182186
Thread.sleep(timeToSleep);
183187
} catch (InterruptedException ignored) {
184188

@@ -200,7 +204,21 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
200204
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
201205
*/
202206
public static boolean retry(Supplier<Boolean> function, int maxAttempts) {
203-
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts);
207+
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts, -1);
208+
}
209+
210+
/**
211+
* Retries the {@code Supplier<Boolean>} function while it returns {@code true} and for the specified max number of attempts.
212+
* It uses -1 as maxSleepMs, to not limit the sleep time between retries.
213+
* @param function the code to call and retry if needed
214+
* @param scaleMs the scale for computing the delay
215+
* @param base the base for computing the delay
216+
* @param maxAttempts the max number of attempts on calling the function
217+
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
218+
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
219+
*/
220+
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
221+
return retry(function, scaleMs, base, maxAttempts, -1);
204222
}
205223

206224
/**

0 commit comments

Comments
 (0)