From 79d77c07663457684f866bb3c6649e5d775b83ba Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 21 Apr 2024 23:04:42 +0900 Subject: [PATCH 01/13] Remove _throttleRate variable from ReplicationThrottleHelper class --- .../cruisecontrol/executor/Executor.java | 11 +- .../executor/ReplicationThrottleHelper.java | 122 ++++++++---------- .../ReplicationThrottleHelperTest.java | 46 ++----- 3 files changed, 74 insertions(+), 105 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 22f210f0a5..2d95afe122 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1606,8 +1606,7 @@ private void updateOngoingExecutionState() { private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { Set currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, - currentDeadBrokersWithReplicas); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, currentDeadBrokersWithReplicas); int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); @@ -1622,7 +1621,9 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + if (_replicationThrottle != null) { + throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); + } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); @@ -1645,7 +1646,9 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + if (_replicationThrottle != null) { + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + } } // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index 359f379d60..d20dc2d9d2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -45,45 +45,40 @@ class ReplicationThrottleHelper { static final int RETRIES = 30; private final AdminClient _adminClient; - private final Long _throttleRate; private final int _retries; private final Set _deadBrokers; - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate) { - this(adminClient, throttleRate, RETRIES); + ReplicationThrottleHelper(AdminClient adminClient) { + this(adminClient, RETRIES); } - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, Set deadBrokers) { - this(adminClient, throttleRate, RETRIES, deadBrokers); + ReplicationThrottleHelper(AdminClient adminClient, Set deadBrokers) { + this(adminClient, RETRIES, deadBrokers); } // for testing - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries) { + ReplicationThrottleHelper(AdminClient adminClient, int retries) { this._adminClient = adminClient; - this._throttleRate = throttleRate; this._retries = retries; this._deadBrokers = new HashSet(); } - ReplicationThrottleHelper(AdminClient adminClient, Long throttleRate, int retries, Set deadBrokers) { + ReplicationThrottleHelper(AdminClient adminClient, int retries, Set deadBrokers) { this._adminClient = adminClient; - this._throttleRate = throttleRate; this._retries = retries; this._deadBrokers = deadBrokers; } - void setThrottles(List replicaMovementProposals) + void setThrottles(List replicaMovementProposals, long throttleRate) throws ExecutionException, InterruptedException, TimeoutException { - if (throttlingEnabled()) { - LOG.info("Setting a rebalance throttle of {} bytes/sec", _throttleRate); - Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); - Map> throttledReplicas = getThrottledReplicasByTopic(replicaMovementProposals); - for (int broker : participatingBrokers) { - setThrottledRateIfNecessary(broker); - } - for (Map.Entry> entry : throttledReplicas.entrySet()) { - setThrottledReplicas(entry.getKey(), entry.getValue()); - } + LOG.info("Setting a rebalance throttle of {} bytes/sec", throttleRate); + Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); + Map> throttledReplicas = getThrottledReplicasByTopic(replicaMovementProposals); + for (int broker : participatingBrokers) { + setThrottledRateIfNecessary(broker, throttleRate); + } + for (Map.Entry> entry : throttledReplicas.entrySet()) { + setThrottledReplicas(entry.getKey(), entry.getValue()); } } @@ -107,49 +102,43 @@ boolean taskIsInProgress(ExecutionTask task) { // clear throttles for a specific list of execution tasks void clearThrottles(List completedTasks, List inProgressTasks) throws ExecutionException, InterruptedException, TimeoutException { - if (throttlingEnabled()) { - List completedProposals = - completedTasks - .stream() - // Filter for completed tasks related to inter-broker replica movement - .filter(this::shouldRemoveThrottleForTask) - .map(ExecutionTask::proposal) - .collect(Collectors.toList()); - - // These are the brokers which have completed a task with - // inter-broker replica movement - Set participatingBrokers = getParticipatingBrokers(completedProposals); - - List inProgressProposals = - inProgressTasks - .stream() - .filter(this::taskIsInProgress) - .map(ExecutionTask::proposal) - .collect(Collectors.toList()); - - // These are the brokers which currently have in-progress - // inter-broker replica movement - Set brokersWithInProgressTasks = getParticipatingBrokers(inProgressProposals); - - // Remove the brokers with in-progress replica moves from the brokers that have - // completed inter-broker replica moves - Set brokersToRemoveThrottlesFrom = new TreeSet<>(participatingBrokers); - brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks); - - LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom); - for (int broker : brokersToRemoveThrottlesFrom) { - removeThrottledRateFromBroker(broker); - } - - Map> throttledReplicas = getThrottledReplicasByTopic(completedProposals); - for (Map.Entry> entry : throttledReplicas.entrySet()) { - removeThrottledReplicasFromTopic(entry.getKey(), entry.getValue()); - } + List completedProposals = + completedTasks + .stream() + // Filter for completed tasks related to inter-broker replica movement + .filter(this::shouldRemoveThrottleForTask) + .map(ExecutionTask::proposal) + .collect(Collectors.toList()); + + // These are the brokers which have completed a task with + // inter-broker replica movement + Set participatingBrokers = getParticipatingBrokers(completedProposals); + + List inProgressProposals = + inProgressTasks + .stream() + .filter(this::taskIsInProgress) + .map(ExecutionTask::proposal) + .collect(Collectors.toList()); + + // These are the brokers which currently have in-progress + // inter-broker replica movement + Set brokersWithInProgressTasks = getParticipatingBrokers(inProgressProposals); + + // Remove the brokers with in-progress replica moves from the brokers that have + // completed inter-broker replica moves + Set brokersToRemoveThrottlesFrom = new TreeSet<>(participatingBrokers); + brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks); + + LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom); + for (int broker : brokersToRemoveThrottlesFrom) { + removeThrottledRateFromBroker(broker); } - } - private boolean throttlingEnabled() { - return _throttleRate != null; + Map> throttledReplicas = getThrottledReplicasByTopic(completedProposals); + for (Map.Entry> entry : throttledReplicas.entrySet()) { + removeThrottledReplicasFromTopic(entry.getKey(), entry.getValue()); + } } private Set getParticipatingBrokers(List replicaMovementProposals) { @@ -177,17 +166,14 @@ private Map> getThrottledReplicasByTopic(List ops = new ArrayList<>(); for (String replicaThrottleRateConfigKey : Arrays.asList(LEADER_THROTTLED_RATE, FOLLOWER_THROTTLED_RATE)) { ConfigEntry currThrottleRate = brokerConfigs.get(replicaThrottleRateConfigKey); - if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(_throttleRate))) { - LOG.debug("Setting {} to {} bytes/second for broker {}", replicaThrottleRateConfigKey, _throttleRate, brokerId); - ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleRateConfigKey, String.valueOf(_throttleRate)), AlterConfigOp.OpType.SET)); + if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(throttleRate))) { + LOG.debug("Setting {} to {} bytes/second for broker {}", replicaThrottleRateConfigKey, throttleRate, brokerId); + ops.add(new AlterConfigOp(new ConfigEntry(replicaThrottleRateConfigKey, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET)); } } if (!ops.isEmpty()) { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index b1f94014dd..5cc8c9aadd 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -110,26 +110,6 @@ private ExecutionTask completedTaskForProposal(long id, ExecutionProposal propos return task; } - @Test - public void testIsNoOpWhenThrottleIsNull() throws Exception { - AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); - EasyMock.replay(mockAdminClient); - - // Test would fail on any unexpected interactions with the kafkaZkClient - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, null); - ExecutionProposal proposal = new ExecutionProposal(new TopicPartition("topic", 0), - 100, - new ReplicaPlacementInfo(0), - Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), - Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2))); - - ExecutionTask task = completedTaskForProposal(0, proposal); - - throttleHelper.setThrottles(Collections.singletonList(proposal)); - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); - EasyMock.verify(mockAdminClient); - } - @Test public void testClearThrottleOnNonExistentTopic() throws Exception { final long throttleRate = 100L; @@ -146,7 +126,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { Arrays.asList(new ReplicaPlacementInfo(brokerId0), new ReplicaPlacementInfo(brokerId2))); AdminClient mockAdminClient = EasyMock.mock(AdminClient.class); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient); // Case 1: a situation where Topic0 does not exist. Hence no property is returned upon read. @@ -206,7 +186,7 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { Arrays.asList(new ReplicaPlacementInfo(brokerId0), new ReplicaPlacementInfo(brokerId2))); AdminClient mockAdminClient = EasyMock.strictMock(AdminClient.class); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient); // Case 1: a situation where Topic0 does not exist. Hence no property is returned upon read. expectDescribeBrokerConfigs(mockAdminClient, brokers); @@ -216,7 +196,7 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { expectListTopics(mockAdminClient, Collections.emptySet()); EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); EasyMock.verify(mockAdminClient); // Case 2: a situation where Topic0 gets deleted after its configs were read. Change configs should not fail. @@ -231,7 +211,7 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { expectListTopics(mockAdminClient, Collections.emptySet()); EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); EasyMock.verify(mockAdminClient); } @@ -240,7 +220,7 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { createTopics(); final long throttleRate = 100L; - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); ExecutionProposal proposal = new ExecutionProposal(new TopicPartition(TOPIC0, 0), 100, new ReplicaPlacementInfo(0), @@ -249,7 +229,7 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { ExecutionTask task = completedTaskForProposal(0, proposal); - throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); assertExpectedThrottledRateForBroker(0, throttleRate); assertExpectedThrottledRateForBroker(1, throttleRate); @@ -272,7 +252,7 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { createTopics(); final long throttleRate = 100L; - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); ExecutionProposal proposal = new ExecutionProposal( new TopicPartition(TOPIC0, 0), 100, @@ -309,7 +289,7 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:1"), AlterConfigOp.OpType.SET)); throttleHelper.changeTopicConfigs(TOPIC1, topic1Config); - throttleHelper.setThrottles(Collections.singletonList(proposal)); + throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); assertExpectedThrottledRateForBroker(0, throttleRate); assertExpectedThrottledRateForBroker(1, throttleRate); @@ -340,7 +320,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { final long throttleRate = 100L; - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); // Set replica throttle config values for both topics setWildcardThrottleReplicaForTopic(throttleHelper, TOPIC0); @@ -357,7 +337,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2))); - throttleHelper.setThrottles(Arrays.asList(proposal, proposal2)); + throttleHelper.setThrottles(Arrays.asList(proposal, proposal2), throttleRate); ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); @@ -411,7 +391,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { final long throttleRate = 100L; - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, throttleRate); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); ExecutionProposal proposal = new ExecutionProposal(new TopicPartition(TOPIC0, 0), 100, new ReplicaPlacementInfo(0), @@ -424,7 +404,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2))); - throttleHelper.setThrottles(Arrays.asList(proposal, proposal2)); + throttleHelper.setThrottles(Arrays.asList(proposal, proposal2), throttleRate); ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); @@ -484,7 +464,7 @@ public void testWaitForConfigs() throws Exception { expectDescribeTopicConfigs(mockAdminClient, TOPIC0, EMPTY_CONFIG, true); } EasyMock.replay(mockAdminClient); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, 100L, retries); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(mockAdminClient, retries); ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC0); assertThrows(IllegalStateException.class, () -> throttleHelper.waitForConfigs(cf, Collections.singletonList( new AlterConfigOp(new ConfigEntry("k", "v"), AlterConfigOp.OpType.SET) From a6db859e34f9697cd9bf77b109c41425d1dd26ad Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 21 Apr 2024 23:08:27 +0900 Subject: [PATCH 02/13] =?UTF-8?q?Rename=20ReplicationThrottleHelper=20meth?= =?UTF-8?q?od=20names:=20ThrottledRate=20=E2=86=92=20ReplicationThrottledR?= =?UTF-8?q?ate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cruisecontrol/executor/ReplicationThrottleHelper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index d20dc2d9d2..a24e60cf09 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -75,7 +75,7 @@ void setThrottles(List replicaMovementProposals, long throttl Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); Map> throttledReplicas = getThrottledReplicasByTopic(replicaMovementProposals); for (int broker : participatingBrokers) { - setThrottledRateIfNecessary(broker, throttleRate); + setReplicationThrottledRateIfNecessary(broker, throttleRate); } for (Map.Entry> entry : throttledReplicas.entrySet()) { setThrottledReplicas(entry.getKey(), entry.getValue()); @@ -132,7 +132,7 @@ void clearThrottles(List completedTasks, List inPr LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom); for (int broker : brokersToRemoveThrottlesFrom) { - removeThrottledRateFromBroker(broker); + removeReplicationThrottledRateFromBroker(broker); } Map> throttledReplicas = getThrottledReplicasByTopic(completedProposals); @@ -166,7 +166,7 @@ private Map> getThrottledReplicasByTopic(List ops = new ArrayList<>(); for (String replicaThrottleRateConfigKey : Arrays.asList(LEADER_THROTTLED_RATE, FOLLOWER_THROTTLED_RATE)) { @@ -316,7 +316,7 @@ private void removeThrottledReplicasFromTopic(String topic, Set replicas } } - private void removeThrottledRateFromBroker(Integer brokerId) + private void removeReplicationThrottledRateFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException { Config brokerConfigs = getBrokerConfigs(brokerId); ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE); From 6879b582a2c2f42719574dbcd6b00b68c97486cf Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 21 Apr 2024 23:10:23 +0900 Subject: [PATCH 03/13] =?UTF-8?q?Rename=20ReplicationThrottleHelper=20meth?= =?UTF-8?q?od=20name:=20setThrottles=20=E2=86=92=20setReplicationThrottles?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/cruisecontrol/executor/Executor.java | 2 +- .../executor/ReplicationThrottleHelper.java | 2 +- .../executor/ReplicationThrottleHelperTest.java | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 2d95afe122..6cc4349fb8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1622,7 +1622,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { if (_replicationThrottle != null) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); + throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index a24e60cf09..1a0d35e025 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -69,7 +69,7 @@ class ReplicationThrottleHelper { this._deadBrokers = deadBrokers; } - void setThrottles(List replicaMovementProposals, long throttleRate) + void setReplicationThrottles(List replicaMovementProposals, long throttleRate) throws ExecutionException, InterruptedException, TimeoutException { LOG.info("Setting a rebalance throttle of {} bytes/sec", throttleRate); Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index 5cc8c9aadd..73f7b9bd3f 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -196,7 +196,7 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { expectListTopics(mockAdminClient, Collections.emptySet()); EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); + throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); EasyMock.verify(mockAdminClient); // Case 2: a situation where Topic0 gets deleted after its configs were read. Change configs should not fail. @@ -211,7 +211,7 @@ public void testSetThrottleOnNonExistentTopic() throws Exception { expectListTopics(mockAdminClient, Collections.emptySet()); EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); + throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); EasyMock.verify(mockAdminClient); } @@ -229,7 +229,7 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { ExecutionTask task = completedTaskForProposal(0, proposal); - throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); + throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); assertExpectedThrottledRateForBroker(0, throttleRate); assertExpectedThrottledRateForBroker(1, throttleRate); @@ -289,7 +289,7 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { new AlterConfigOp(new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:1"), AlterConfigOp.OpType.SET)); throttleHelper.changeTopicConfigs(TOPIC1, topic1Config); - throttleHelper.setThrottles(Collections.singletonList(proposal), throttleRate); + throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); assertExpectedThrottledRateForBroker(0, throttleRate); assertExpectedThrottledRateForBroker(1, throttleRate); @@ -337,7 +337,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2))); - throttleHelper.setThrottles(Arrays.asList(proposal, proposal2), throttleRate); + throttleHelper.setReplicationThrottles(Arrays.asList(proposal, proposal2), throttleRate); ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); @@ -404,7 +404,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2))); - throttleHelper.setThrottles(Arrays.asList(proposal, proposal2), throttleRate); + throttleHelper.setReplicationThrottles(Arrays.asList(proposal, proposal2), throttleRate); ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); From e5f6489fa62c72b1b38ec6552c6636b249218d96 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 21 Apr 2024 23:45:22 +0900 Subject: [PATCH 04/13] Add ReplicationThrottleHelper#LOG_DIR_THROTTLED_RATE support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add: setLogDirThrottles, setLogDirThrottledRateIfNecessary - Rename: removeReplicationThrottledRateFromBroker → removeThrottledRatesFromBroker --- .../cruisecontrol/executor/Executor.java | 3 +- .../executor/ReplicationThrottleHelper.java | 39 +++++++- .../ReplicationThrottleHelperTest.java | 95 +++++++++++-------- 3 files changed, 95 insertions(+), 42 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 6cc4349fb8..8a2416664d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1622,7 +1622,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { if (_replicationThrottle != null) { - throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); + throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), + _replicationThrottle); } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index 1a0d35e025..cb6b21cde4 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -39,6 +39,7 @@ class ReplicationThrottleHelper { static final String WILDCARD_ASTERISK = "*"; static final String LEADER_THROTTLED_RATE = QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG; static final String FOLLOWER_THROTTLED_RATE = QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG; + static final String LOG_DIR_THROTTLED_RATE = QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG; static final String LEADER_THROTTLED_REPLICAS = QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG; static final String FOLLOWER_THROTTLED_REPLICAS = QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG; public static final long CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); @@ -82,6 +83,15 @@ void setReplicationThrottles(List replicaMovementProposals, l } } + void setLogDirThrottles(List replicaMovementProposals, long throttleRate) + throws ExecutionException, InterruptedException, TimeoutException { + LOG.info("Setting a log dir throttle of {} bytes/sec", throttleRate); + Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); + for (Integer broker: participatingBrokers) { + setLogDirThrottledRateIfNecessary(broker, throttleRate); + } + } + // Determines if a candidate task is ready to have its throttles removed. boolean shouldRemoveThrottleForTask(ExecutionTask task) { return @@ -132,7 +142,7 @@ void clearThrottles(List completedTasks, List inPr LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom); for (int broker : brokersToRemoveThrottlesFrom) { - removeReplicationThrottledRateFromBroker(broker); + removeThrottledRatesFromBroker(broker); } Map> throttledReplicas = getThrottledReplicasByTopic(completedProposals); @@ -166,7 +176,8 @@ private Map> getThrottledReplicasByTopic(List ops = new ArrayList<>(); for (String replicaThrottleRateConfigKey : Arrays.asList(LEADER_THROTTLED_RATE, FOLLOWER_THROTTLED_RATE)) { @@ -181,6 +192,19 @@ private void setReplicationThrottledRateIfNecessary(int brokerId, long throttleR } } + private void setLogDirThrottledRateIfNecessary(int brokerId, long throttleRate) throws ExecutionException, InterruptedException, TimeoutException { + Config brokerConfigs = getBrokerConfigs(brokerId); + List ops = new ArrayList<>(); + ConfigEntry currThrottleRate = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); + if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(throttleRate))) { + LOG.debug("Setting {} to {} bytes/second for broker {}", LOG_DIR_THROTTLED_RATE, throttleRate, brokerId); + ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET)); + } + if (!ops.isEmpty()) { + changeBrokerConfigs(brokerId, ops); + } + } + private Config getTopicConfigs(String topic) throws ExecutionException, InterruptedException, TimeoutException { try { return getEntityConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic)); @@ -316,11 +340,12 @@ private void removeThrottledReplicasFromTopic(String topic, Set replicas } } - private void removeReplicationThrottledRateFromBroker(Integer brokerId) + private void removeThrottledRatesFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException { Config brokerConfigs = getBrokerConfigs(brokerId); ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE); ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_THROTTLED_RATE); + ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); List ops = new ArrayList<>(); if (currLeaderThrottle != null) { if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { @@ -338,6 +363,14 @@ private void removeReplicationThrottledRateFromBroker(Integer brokerId) ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE)); } } + if (currLogDirThrottle != null) { + if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { + LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle); + } else { + LOG.debug("Removing log dir throttle rate: {} on broker {}", currLogDirThrottle, brokerId); + ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE)); + } + } if (!ops.isEmpty()) { changeBrokerConfigs(brokerId, ops); } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index 73f7b9bd3f..49fb2de4de 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -112,7 +112,6 @@ private ExecutionTask completedTaskForProposal(long id, ExecutionProposal propos @Test public void testClearThrottleOnNonExistentTopic() throws Exception { - final long throttleRate = 100L; final int brokerId0 = 0; final int brokerId1 = 1; final int brokerId2 = 2; @@ -230,21 +229,29 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { ExecutionTask task = completedTaskForProposal(0, proposal); throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); + throttleHelper.setLogDirThrottles(Collections.singletonList(proposal), throttleRate); - assertExpectedThrottledRateForBroker(0, throttleRate); - assertExpectedThrottledRateForBroker(1, throttleRate); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, throttleRate); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // No throttle on broker 3 because it's not involved in any of the execution proposals: - assertExpectedThrottledRateForBroker(3, null); + assertExpectedReplicationThrottledRateForBroker(3, null); assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2"); + assertExpectedLogDirThrottledRateForBroker(0, throttleRate); + assertExpectedLogDirThrottledRateForBroker(1, throttleRate); + assertExpectedLogDirThrottledRateForBroker(2, throttleRate); + // We expect all throttles to be cleaned up throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { - assertExpectedThrottledRateForBroker(i, null); + assertExpectedReplicationThrottledRateForBroker(i, null); } assertExpectedThrottledReplicas(TOPIC0, ""); + for (int i = 0; i < clusterSize(); i++) { + assertExpectedLogDirThrottledRateForBroker(i, null); + } } @Test @@ -291,11 +298,11 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { throttleHelper.setReplicationThrottles(Collections.singletonList(proposal), throttleRate); - assertExpectedThrottledRateForBroker(0, throttleRate); - assertExpectedThrottledRateForBroker(1, throttleRate); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, throttleRate); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // No throttle on broker 3 because it's not involved in any of the execution proposals: - assertExpectedThrottledRateForBroker(3, null); + assertExpectedReplicationThrottledRateForBroker(3, null); // Existing throttled replicas are merged with new throttled replicas for topic 0: assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2,1:0,1:1"); // Existing throttled replicas are unchanged for topic 1: @@ -308,7 +315,7 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { // However, we do expect the broker throttles to be removed. throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { - assertExpectedThrottledRateForBroker(i, null); + assertExpectedReplicationThrottledRateForBroker(i, null); } assertExpectedThrottledReplicas(TOPIC0, "1:0,1:1"); assertExpectedThrottledReplicas(TOPIC1, "1:1"); @@ -342,33 +349,33 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); - assertExpectedThrottledRateForBroker(0, throttleRate); - assertExpectedThrottledRateForBroker(1, throttleRate); - assertExpectedThrottledRateForBroker(2, throttleRate); - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, throttleRate); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); // Topic-level throttled replica config value should remain as "*" assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); - assertExpectedThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. - assertExpectedThrottledRateForBroker(1, null); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, null); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // We expect broker 3 to have a throttle on it because there is an in-progress replica being moved - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); // Topic-level throttled replica config value should remain as "*" assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); // passing an inProgress task that is not complete should have no effect. throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); - assertExpectedThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. - assertExpectedThrottledRateForBroker(1, null); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, null); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // We expect broker 3 to have a throttle on it because there is an in-progress replica being moved - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); // Topic-level throttled replica config value should remain as "*" assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); @@ -378,7 +385,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { - assertExpectedThrottledRateForBroker(i, null); + assertExpectedReplicationThrottledRateForBroker(i, null); } // Topic-level throttled replica config value should remain as "*" assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); @@ -409,29 +416,29 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { ExecutionTask completedTask = completedTaskForProposal(0, proposal); ExecutionTask inProgressTask = inProgressTaskForProposal(1, proposal2); - assertExpectedThrottledRateForBroker(0, throttleRate); - assertExpectedThrottledRateForBroker(1, throttleRate); - assertExpectedThrottledRateForBroker(2, throttleRate); - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, throttleRate); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3"); throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); - assertExpectedThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. - assertExpectedThrottledRateForBroker(1, null); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, null); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // We expect broker 3 to have a throttle on it because there is an in-progress replica being moved - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); assertExpectedThrottledReplicas(TOPIC0, "1:0,1:2,1:3"); // passing an inProgress task that is not complete should have no effect. throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); - assertExpectedThrottledRateForBroker(0, throttleRate); + assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. - assertExpectedThrottledRateForBroker(1, null); - assertExpectedThrottledRateForBroker(2, throttleRate); + assertExpectedReplicationThrottledRateForBroker(1, null); + assertExpectedReplicationThrottledRateForBroker(2, throttleRate); // We expect broker 3 to have a throttle on it because there is an in-progress replica being moved - assertExpectedThrottledRateForBroker(3, throttleRate); + assertExpectedReplicationThrottledRateForBroker(3, throttleRate); assertExpectedThrottledReplicas(TOPIC0, "1:0,1:2,1:3"); // Completing the in-progress task and clearing the throttles should clean everything up. @@ -439,7 +446,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { - assertExpectedThrottledRateForBroker(i, null); + assertExpectedReplicationThrottledRateForBroker(i, null); } assertExpectedThrottledReplicas(TOPIC0, ""); } @@ -607,7 +614,7 @@ private void expectIncrementalBrokerConfigs(AdminClient adminClient, List brokerConfig = _adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); String expectedString = expectedRate == null ? null : String.valueOf(expectedRate); @@ -621,6 +628,18 @@ private void assertExpectedThrottledRateForBroker(int brokerId, Long expectedRat } } + private void assertExpectedLogDirThrottledRateForBroker(int brokerId, Long expectedRate) throws ExecutionException, InterruptedException { + ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); + Map brokerConfig = _adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); + String expectedString = expectedRate == null ? null : String.valueOf(expectedRate); + assertNotNull(brokerConfig.get(cf)); + if (expectedRate == null) { + assertNull(brokerConfig.get(cf).get(ReplicationThrottleHelper.LOG_DIR_THROTTLED_RATE)); + } else { + assertEquals(expectedString, brokerConfig.get(cf).get(ReplicationThrottleHelper.LOG_DIR_THROTTLED_RATE).value()); + } + } + private void assertExpectedThrottledReplicas(String topic, String expectedReplicas) throws ExecutionException, InterruptedException { ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); Map topicConfig = _adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); From a6c27ccd8b5d9a08c3d0b568ce23c608e7a3a91d Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 21 Apr 2024 20:34:07 +0900 Subject: [PATCH 05/13] Add (default.)log.dir.throttle parameter --- .../cruisecontrol/KafkaCruiseControl.java | 10 ++++- .../config/constants/ExecutorConfig.java | 13 +++++++ .../cruisecontrol/executor/Executor.java | 21 ++++++++-- .../async/runnable/AddBrokersRunnable.java | 4 ++ .../runnable/FixOfflineReplicasRunnable.java | 5 +++ .../async/runnable/RebalanceRunnable.java | 5 ++- .../async/runnable/RemoveBrokersRunnable.java | 5 ++- .../async/runnable/RemoveDisksRunnable.java | 1 + .../UpdateTopicConfigurationRunnable.java | 8 +++- .../AddedOrRemovedBrokerParameters.java | 6 +++ .../FixOfflineReplicasParameters.java | 6 +++ .../servlet/parameters/ParameterUtils.java | 9 +++++ .../parameters/RebalanceParameters.java | 8 +++- ...opicReplicationFactorChangeParameters.java | 6 +++ .../detector/AnomalyDetectorManagerTest.java | 5 ++- .../detector/IdempotenceCacheTest.java | 4 +- .../detector/MaintenanceEventTest.java | 17 ++++++--- .../cruisecontrol/executor/ExecutorTest.java | 13 +++++-- .../parameters/ParameterUtilsTest.java | 38 +++++++++++++++++++ 19 files changed, 160 insertions(+), 24 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index 921e84e042..6d7025c087 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -651,6 +651,8 @@ private static boolean hasProposalsToExecute(Collection propo * (if null, use default.replica.movement.strategies). * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers * when executing proposals (if null, no throttling is applied). + * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs + * when executing proposals (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. * @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker @@ -667,6 +669,7 @@ public void executeProposals(Set proposals, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, + Long logDirThrottle, boolean isTriggeredByUserRequest, String uuid, boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException { @@ -674,7 +677,7 @@ public void executeProposals(Set proposals, _executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, clusterConcurrentLeaderMovements, brokerConcurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, - isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment); + logDirThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment); } else { failGeneratingProposalsForExecution(uuid); } @@ -700,6 +703,8 @@ public void executeProposals(Set proposals, * (if null, use default.replica.movement.strategies). * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers * when executing remove operations (if null, no throttling is applied). + * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs + * when executing remove operations (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. */ @@ -714,13 +719,14 @@ public void executeRemoval(Set proposals, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, + Long logDirThrottle, boolean isTriggeredByUserRequest, String uuid) throws OngoingExecutionException { if (hasProposalsToExecute(proposals, uuid)) { _executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers, _loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, 0, clusterLeaderMovementConcurrency, brokerLeaderMovementConcurrency, - executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, + executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, logDirThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false); } else { failGeneratingProposalsForExecution(uuid); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 99a2b4cd87..ad3103562f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -206,6 +206,14 @@ public final class ExecutorConfig { public static final String DEFAULT_REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being " + "moved, in bytes per second."; + /** + * default.log.dir.throttle + */ + public static final String DEFAULT_LOG_DIR_THROTTLE_CONFIG = "default.log.dir.throttle"; + public static final Long DEFAULT_DEFAULT_LOG_DIR_THROTTLE = null; + public static final String DEFAULT_LOG_DIR_THROTTLE_DOC = "The throttle applied to replicas being moved between " + + "the log dirs, in bytes per second."; + /** * replica.movement.strategies */ @@ -741,6 +749,11 @@ public static ConfigDef define(ConfigDef configDef) { DEFAULT_DEFAULT_REPLICATION_THROTTLE, ConfigDef.Importance.MEDIUM, DEFAULT_REPLICATION_THROTTLE_DOC) + .define(DEFAULT_LOG_DIR_THROTTLE_CONFIG, + ConfigDef.Type.LONG, + DEFAULT_DEFAULT_LOG_DIR_THROTTLE, + ConfigDef.Importance.MEDIUM, + DEFAULT_LOG_DIR_THROTTLE_DOC) .define(REPLICA_MOVEMENT_STRATEGIES_CONFIG, ConfigDef.Type.LIST, DEFAULT_REPLICA_MOVEMENT_STRATEGIES, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 8a2416664d..a74ad691e3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -801,6 +801,8 @@ public boolean setConcurrencyAdjusterMinIsrCheck(boolean isMinIsrBasedConcurrenc * @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks. * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers * when executing a proposal (if null, no throttling is applied). + * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs + * when executing a proposal (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. * @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise. @@ -819,6 +821,7 @@ public synchronized void executeProposals(Collection proposal Long requestedExecutionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, + Long logDirThrottle, boolean isTriggeredByUserRequest, String uuid, boolean isKafkaAssignerMode, @@ -831,7 +834,7 @@ public synchronized void executeProposals(Collection proposal requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); - startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); + startExecution(loadMonitor, null, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { LOG.info("User task {}: Broker removal operation aborted due to ongoing execution", uuid); @@ -924,7 +927,7 @@ public synchronized void executeDemoteProposals(Collection pr initProposalExecution(proposals, demotedBrokers, concurrentSwaps, null, 0, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); - startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, isTriggeredByUserRequest); + startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { processExecuteProposalsFailure(); throw e; @@ -1005,12 +1008,15 @@ private int numExecutionStartedInNonKafkaAssignerMode() { * @param removedBrokers Brokers to be removed, null if no broker has been removed. * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers * while moving partitions (if null, no throttling is applied). + * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs + * while moving partitions (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. */ private void startExecution(LoadMonitor loadMonitor, Collection demotedBrokers, Collection removedBrokers, Long replicationThrottle, + Long logDirThrottle, boolean isTriggeredByUserRequest) throws OngoingExecutionException { _executionStoppedByUser.set(false); sanityCheckOngoingMovement(); @@ -1046,7 +1052,7 @@ private void startExecution(LoadMonitor loadMonitor, _numExecutionStartedInNonKafkaAssignerMode.incrementAndGet(); } _proposalExecutor.execute( - new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, isTriggeredByUserRequest)); + new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest)); } /** @@ -1300,6 +1306,7 @@ private class ProposalExecutionRunnable implements Runnable { private final Set _recentlyDemotedBrokers; private final Set _recentlyRemovedBrokers; private final Long _replicationThrottle; + private final Long _logDirThrottle; private Throwable _executionException; private final boolean _isTriggeredByUserRequest; private long _lastSlowTaskReportingTimeMs; @@ -1314,6 +1321,7 @@ private class ProposalExecutionRunnable implements Runnable { Collection demotedBrokers, Collection removedBrokers, Long replicationThrottle, + Long logDirThrottle, boolean isTriggeredByUserRequest) { _loadMonitor = loadMonitor; _demotedBrokers = demotedBrokers; @@ -1349,6 +1357,7 @@ private class ProposalExecutionRunnable implements Runnable { _recentlyDemotedBrokers = recentlyDemotedBrokers(); _recentlyRemovedBrokers = recentlyRemovedBrokers(); _replicationThrottle = replicationThrottle; + _logDirThrottle = logDirThrottle; _isTriggeredByUserRequest = isTriggeredByUserRequest; _lastSlowTaskReportingTimeMs = -1L; if (_removedBrokers != null && !_removedBrokers.isEmpty()) { @@ -1625,6 +1634,10 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); } + if (_logDirThrottle != null) { + throttleHelper.setLogDirThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), + _logDirThrottle); + } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); @@ -1647,7 +1660,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - if (_replicationThrottle != null) { + if (_replicationThrottle != null || _logDirThrottle != null) { throttleHelper.clearThrottles(completedTasks, inProgressTasks); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java index eb57c05dbf..9617091cf8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java @@ -42,6 +42,7 @@ public class AddBrokersRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; + protected final Long _logDirThrottle; protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = false; /** @@ -67,6 +68,7 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); + _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, @@ -84,6 +86,7 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); + _logDirThrottle = parameters.logDirThrottle(); } @Override @@ -127,6 +130,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, + _logDirThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/FixOfflineReplicasRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/FixOfflineReplicasRunnable.java index 167a4b3b00..ca185b1a5a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/FixOfflineReplicasRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/FixOfflineReplicasRunnable.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG; import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.*; @@ -33,6 +34,7 @@ public class FixOfflineReplicasRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; + protected final Long _logDirThrottle; protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = false; /** @@ -55,6 +57,7 @@ public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(DEFAULT_REPLICATION_THROTTLE_CONFIG); + _logDirThrottle = kafkaCruiseControl.config().getLong(DEFAULT_LOG_DIR_THROTTLE_CONFIG); } public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl, @@ -70,6 +73,7 @@ public FixOfflineReplicasRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); + _logDirThrottle = parameters.logDirThrottle(); } @Override @@ -111,6 +115,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, + _logDirThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RebalanceRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RebalanceRunnable.java index 74fb117d1d..7bbe253e36 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RebalanceRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RebalanceRunnable.java @@ -38,6 +38,7 @@ public class RebalanceRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; + protected final Long _logDirThrottle; protected final boolean _ignoreProposalCache; protected final Set _destinationBrokerIds; protected final boolean _isRebalanceDiskMode; @@ -64,6 +65,7 @@ public RebalanceRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); + _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); _ignoreProposalCache = SELF_HEALING_IGNORE_PROPOSAL_CACHE; _destinationBrokerIds = SELF_HEALING_DESTINATION_BROKER_IDS; _isRebalanceDiskMode = SELF_HEALING_IS_REBALANCE_DISK_MODE; @@ -83,6 +85,7 @@ public RebalanceRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); + _logDirThrottle = parameters.logDirThrottle(); _ignoreProposalCache = parameters.ignoreProposalCache(); _destinationBrokerIds = parameters.destinationBrokerIds(); _isRebalanceDiskMode = parameters.isRebalanceDiskMode(); @@ -124,7 +127,7 @@ protected OptimizerResult workWithoutClusterModel() throws KafkaCruiseControlExc _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, _concurrentIntraBrokerPartitionMovements, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, _executionProgressCheckIntervalMs, _replicaMovementStrategy, - _replicationThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); + _replicationThrottle, _logDirThrottle, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); } return result; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 16e05b96e5..7abf42ed40 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -49,6 +49,7 @@ public class RemoveBrokersRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; + protected final Long _logDirThrottle; /** * Constructor to be used for creating a runnable for self-healing. @@ -74,6 +75,7 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); + _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, @@ -92,6 +94,7 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); + _logDirThrottle = parameters.logDirThrottle(); } @Override @@ -155,7 +158,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, - _isTriggeredByUserRequest, _uuid); + _logDirThrottle, _isTriggeredByUserRequest, _uuid); } return result; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java index 8e4710e766..0aedc0aaf1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java @@ -119,6 +119,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS, SELF_HEALING_REPLICA_MOVEMENT_STRATEGY, _kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG), + _kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG), _isTriggeredByUserRequest, _uuid, false diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java index f10202b48d..dabd5dab4e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java @@ -80,6 +80,7 @@ public class UpdateTopicConfigurationRunnable extends GoalBasedOperationRunnable protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; + protected final Long _logDirThrottle; protected Cluster _cluster; protected Map> _topicsToChangeByReplicationFactor; protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = true; @@ -107,6 +108,7 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = topicReplicationFactorChangeParameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = topicReplicationFactorChangeParameters.replicaMovementStrategy(); _replicationThrottle = topicReplicationFactorChangeParameters.replicationThrottle(); + _logDirThrottle = topicReplicationFactorChangeParameters.logDirThrottle(); } else { _topicPatternByReplicationFactor = null; _skipRackAwarenessCheck = false; @@ -117,6 +119,7 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = null; _replicaMovementStrategy = null; _replicationThrottle = null; + _logDirThrottle = null; } } @@ -149,6 +152,7 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); + _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } @Override @@ -209,8 +213,8 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _kafkaCruiseControl.executeProposals(result.goalProposals(), Collections.emptySet(), false, _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, 0, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, - _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, _isTriggeredByUserRequest, _uuid, - SKIP_AUTO_REFRESHING_CONCURRENCY); + _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, _logDirThrottle, + _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); } return result; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java index e326828223..464edc8f3a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/AddedOrRemovedBrokerParameters.java @@ -60,6 +60,7 @@ public abstract class AddedOrRemovedBrokerParameters extends GoalBasedOptimizati protected Long _executionProgressCheckIntervalMs; protected boolean _dryRun; protected Long _replicationThrottle; + protected Long _logDirThrottle; protected boolean _skipHardGoalCheck; protected ReplicaMovementStrategy _replicaMovementStrategy; protected Integer _reviewId; @@ -81,6 +82,7 @@ protected void initParameters() throws UnsupportedEncodingException { _brokerLeaderMovementConcurrency = ParameterUtils.concurrentMovements(_requestContext, ConcurrencyType.LEADERSHIP_BROKER); _executionProgressCheckIntervalMs = ParameterUtils.executionProgressCheckIntervalMs(_requestContext); _replicationThrottle = ParameterUtils.replicationThrottle(_requestContext, _config); + _logDirThrottle = ParameterUtils.logDirThrottle(_requestContext, _config); _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_requestContext); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_requestContext, _config); boolean twoStepVerificationEnabled = _config.getBoolean(WebServerConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); @@ -134,6 +136,10 @@ public Long replicationThrottle() { return _replicationThrottle; } + public Long logDirThrottle() { + return _logDirThrottle; + } + public boolean skipHardGoalCheck() { return _skipHardGoalCheck; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/FixOfflineReplicasParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/FixOfflineReplicasParameters.java index ac3ca6f50e..4a3fa2f81e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/FixOfflineReplicasParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/FixOfflineReplicasParameters.java @@ -79,6 +79,7 @@ public class FixOfflineReplicasParameters extends GoalBasedOptimizationParameter protected boolean _skipHardGoalCheck; protected ReplicaMovementStrategy _replicaMovementStrategy; protected Long _replicationThrottle; + protected Long _logDirThrottle; protected Integer _reviewId; protected String _reason; protected boolean _stopOngoingExecution; @@ -99,6 +100,7 @@ protected void initParameters() throws UnsupportedEncodingException { _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_requestContext); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_requestContext, _config); _replicationThrottle = ParameterUtils.replicationThrottle(_requestContext, _config); + _logDirThrottle = ParameterUtils.logDirThrottle(_requestContext, _config); boolean twoStepVerificationEnabled = _config.getBoolean(WebServerConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); _reviewId = ParameterUtils.reviewId(_requestContext, twoStepVerificationEnabled); boolean requestReasonRequired = _config.getBoolean(ExecutorConfig.REQUEST_REASON_REQUIRED_CONFIG); @@ -154,6 +156,10 @@ public Long replicationThrottle() { return _replicationThrottle; } + public Long logDirThrottle() { + return _logDirThrottle; + } + public String reason() { return _reason; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java index 839e00f541..c234d3efa8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java @@ -92,6 +92,7 @@ public final class ParameterUtils { public static final String THROTTLE_ADDED_BROKER_PARAM = "throttle_added_broker"; public static final String THROTTLE_REMOVED_BROKER_PARAM = "throttle_removed_broker"; public static final String REPLICATION_THROTTLE_PARAM = "replication_throttle"; + public static final String LOG_DIR_THROTTLE_PARAM = "log_dir_throttle"; public static final String IGNORE_PROPOSAL_CACHE_PARAM = "ignore_proposal_cache"; public static final String USE_READY_DEFAULT_GOALS_PARAM = "use_ready_default_goals"; public static final String EXECUTION_PROGRESS_CHECK_INTERVAL_MS_PARAM = "execution_progress_check_interval_ms"; @@ -432,6 +433,14 @@ static Long replicationThrottle(CruiseControlRequestContext requestContext, Kafk return value; } + static Long logDirThrottle(CruiseControlRequestContext requestContext, KafkaCruiseControlConfig config) { + Long value = getLongParam(requestContext, LOG_DIR_THROTTLE_PARAM, config.getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG)); + if (value != null && value < 0) { + throw new UserRequestException(String.format("Requested log dir throttle must be non-negative (Requested: %s).", value)); + } + return value; + } + static Long time(CruiseControlRequestContext requestContext) { String parameterString = caseSensitiveParameterName(requestContext.getParameterMap(), TIME_PARAM); if (parameterString == null) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java index 8008d2dec7..f5c555b51f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RebalanceParameters.java @@ -50,7 +50,7 @@ * &exclude_recently_removed_brokers=[true/false]&replica_movement_strategies=[strategy1,strategy2...] * &ignore_proposal_cache=[true/false]&destination_broker_ids=[id1,id2...]&kafka_assigner=[true/false] * &rebalance_disk=[true/false]&review_id=[id]&get_response_schema=[true/false] - * &replication_throttle=[bytes_per_second]&reason=[reason-for-request] + * &replication_throttle=[bytes_per_second]&log_dir_throttle=[bytes_per_second]&reason=[reason-for-request] * &execution_progress_check_interval_ms=[interval_in_ms]&stop_ongoing_execution=[true/false]&fast_mode=[true/false] * &doAs=[user] * @@ -85,6 +85,7 @@ public class RebalanceParameters extends ProposalsParameters { protected boolean _skipHardGoalCheck; protected ReplicaMovementStrategy _replicaMovementStrategy; protected Long _replicationThrottle; + protected Long _logDirThrottle; protected Integer _reviewId; protected String _reason; protected boolean _stopOngoingExecution; @@ -109,6 +110,7 @@ protected void initParameters() throws UnsupportedEncodingException { _destinationBrokerIds = ParameterUtils.destinationBrokerIds(_requestContext); boolean twoStepVerificationEnabled = _config.getBoolean(WebServerConfig.TWO_STEP_VERIFICATION_ENABLED_CONFIG); _replicationThrottle = ParameterUtils.replicationThrottle(_requestContext, _config); + _logDirThrottle = ParameterUtils.logDirThrottle(_requestContext, _config); _reviewId = ParameterUtils.reviewId(_requestContext, twoStepVerificationEnabled); _isRebalanceDiskMode = ParameterUtils.isRebalanceDiskMode(_requestContext); boolean requestReasonRequired = _config.getBoolean(ExecutorConfig.REQUEST_REASON_REQUIRED_CONFIG); @@ -168,6 +170,10 @@ public Long replicationThrottle() { return _replicationThrottle; } + public Long logDirThrottle() { + return _logDirThrottle; + } + public String reason() { return _reason; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/TopicReplicationFactorChangeParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/TopicReplicationFactorChangeParameters.java index b235f6e375..fa170dc504 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/TopicReplicationFactorChangeParameters.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/TopicReplicationFactorChangeParameters.java @@ -59,6 +59,7 @@ public class TopicReplicationFactorChangeParameters extends AbstractParameters { protected boolean _skipHardGoalCheck; protected ReplicaMovementStrategy _replicaMovementStrategy; protected Long _replicationThrottle; + protected Long _logDirThrottle; protected TopicReplicationFactorChangeParameters() { super(); @@ -80,6 +81,7 @@ protected void initParameters() throws UnsupportedEncodingException { _skipHardGoalCheck = ParameterUtils.skipHardGoalCheck(_requestContext); _replicaMovementStrategy = ParameterUtils.getReplicaMovementStrategy(_requestContext, _config); _replicationThrottle = ParameterUtils.replicationThrottle(_requestContext, _config); + _logDirThrottle = ParameterUtils.logDirThrottle(_requestContext, _config); } /** @@ -141,6 +143,10 @@ public Long replicationThrottle() { return _replicationThrottle; } + public Long logDirThrottle() { + return _logDirThrottle; + } + @Override public SortedSet caseInsensitiveParameterNames() { return CASE_INSENSITIVE_PARAMETER_NAMES; diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManagerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManagerTest.java index 7e338bd312..306cefc979 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManagerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManagerTest.java @@ -316,6 +316,7 @@ private void testFixAnomaly(AnomalyType anomalyType) EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(false)); @@ -350,6 +351,7 @@ private void testFixAnomaly(AnomalyType anomalyType) EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(false)); @@ -416,6 +418,7 @@ private void testFixAnomaly(AnomalyType anomalyType) EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(true)); @@ -531,7 +534,7 @@ public void testExecutionInProgress() throws InterruptedException { KafkaCruiseControl mockKafkaCruiseControl = EasyMock.mock(KafkaCruiseControl.class); Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props); - EasyMock.expect(mockKafkaCruiseControl.config()).andReturn(kafkaCruiseControlConfig).times(2); + EasyMock.expect(mockKafkaCruiseControl.config()).andReturn(kafkaCruiseControlConfig).times(3); startRunnableDetectors(mockDetectorScheduler, mockGoalViolationDetector, mockMetricAnomalyDetector, mockDiskFailureDetector, mockBrokerFailureDetector, mockTopicAnomalyDetector, mockMaintenanceEventDetector, executorService); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/IdempotenceCacheTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/IdempotenceCacheTest.java index a138d42772..d2cda258d5 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/IdempotenceCacheTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/IdempotenceCacheTest.java @@ -74,7 +74,7 @@ public void testIdempotence() { for (MaintenanceEventType eventType : MaintenanceEventType.cachedValues()) { parameterConfigOverrides.put(MAINTENANCE_EVENT_TYPE_CONFIG, eventType); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).anyTimes(); EasyMock.replay(_mockKafkaCruiseControl); parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_INITIAL_ANOMALY_TIME_MS); initialEvents.add(_config.getConfiguredInstance(AnomalyDetectorConfig.MAINTENANCE_EVENT_CLASS_CONFIG, @@ -97,7 +97,7 @@ public void testIdempotence() { for (MaintenanceEventType eventType : MaintenanceEventType.cachedValues()) { parameterConfigOverrides.put(MAINTENANCE_EVENT_TYPE_CONFIG, eventType); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(2); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).anyTimes(); EasyMock.replay(_mockKafkaCruiseControl); parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_DISTINCT_ANOMALY_TIME_MS); distinctEvents.add(_config.getConfiguredInstance(AnomalyDetectorConfig.MAINTENANCE_EVENT_CLASS_CONFIG, diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java index 2106f0f8a2..32e2694a14 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java @@ -101,7 +101,7 @@ public void testAddBrokerEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -139,6 +139,7 @@ public void testAddBrokerEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(false)); @@ -175,7 +176,7 @@ public void testRemoveBrokerEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -212,6 +213,7 @@ public void testRemoveBrokerEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString()); EasyMock.expect(_optimizerResult.getProposalSummaryForJson()).andReturn(Collections.emptyMap()); @@ -246,7 +248,7 @@ public void testFixOfflineReplicasEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -283,6 +285,7 @@ public void testFixOfflineReplicasEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(false)); @@ -318,7 +321,7 @@ public void testRebalanceEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); // This is for rebalance runnable _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); @@ -372,6 +375,7 @@ public void testRebalanceEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(false)); @@ -477,7 +481,7 @@ public void testTopicReplicationFactorEvent() TOPICS_WITH_RF_UPDATE_CONFIG, MOCK_TOPICS_WITH_RF_UPDATE); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -517,6 +521,7 @@ public void testTopicReplicationFactorEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), + EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString(), EasyMock.eq(true)); @@ -559,7 +564,7 @@ public void testMaintenanceEventEquality() { for (MaintenanceEventType eventType : MaintenanceEventType.cachedValues()) { parameterConfigOverrides.put(MAINTENANCE_EVENT_TYPE_CONFIG, eventType); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(6); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).anyTimes(); EasyMock.replay(_mockKafkaCruiseControl); for (int i = 1; i <= 3; i++) { parameterConfigOverrides.put(ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS * i); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 5f5f3e3a40..86c4c5ea83 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -112,7 +112,7 @@ public void testReplicaReassignment() throws InterruptedException, OngoingExecut List proposalsToExecute = new ArrayList<>(); List proposalsToCheck = new ArrayList<>(); populateProposals(proposalsToExecute, proposalsToCheck, 0); - executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, proposalsToCheck, false, null, false, true); + executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, proposalsToCheck, false, null, null, false, true); } finally { KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } @@ -130,7 +130,8 @@ public void testReplicaReassignmentProgressWithThrottle() throws InterruptedExce List proposalsToCheck = new ArrayList<>(); populateProposals(proposalsToExecute, proposalsToCheck, PRODUCE_SIZE_IN_BYTES); // Throttle rate is set to the half of the produce size. - executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, proposalsToCheck, false, PRODUCE_SIZE_IN_BYTES / 2, true, true); + executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, proposalsToCheck, false, PRODUCE_SIZE_IN_BYTES / 2, PRODUCE_SIZE_IN_BYTES, + true, true); } finally { KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } @@ -162,7 +163,7 @@ public void testBrokerDiesBeforeMovingPartition() throws Exception { new ReplicaPlacementInfo(initialLeader1))); Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); - executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, Collections.emptyList(), true, null, false, false); + executeAndVerifyProposals(kafkaZkClient, proposalsToExecute, Collections.emptyList(), true, null, null, false, false); // We are doing the rollback. -- The leadership should be on the alive broker. assertEquals(initialLeader0, kafkaZkClient.getLeaderForPartition(TP0).get()); @@ -504,6 +505,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx null, null, null, + null, true, RANDOM_UUID, false, @@ -539,6 +541,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx null, null, null, + null, true, RANDOM_UUID, false, @@ -558,6 +561,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx null, null, null, + null, true, RANDOM_UUID, false, @@ -754,6 +758,7 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient, Collection proposalsToCheck, boolean completeWithError, Long replicationThrottle, + Long logDirThrottle, boolean verifyProgress, boolean isTriggeredByUserRequest) throws OngoingExecutionException { @@ -792,7 +797,7 @@ private void executeAndVerifyProposals(KafkaZkClient kafkaZkClient, executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, isTriggeredByUserRequest); executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, null, null, - replicationThrottle, isTriggeredByUserRequest, RANDOM_UUID, false, false); + replicationThrottle, logDirThrottle, isTriggeredByUserRequest, RANDOM_UUID, false, false); if (verifyProgress) { verifyOngoingPartitionReassignments(Collections.singleton(TP0)); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtilsTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtilsTest.java index b576fa329f..bc1fe20971 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtilsTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtilsTest.java @@ -21,6 +21,7 @@ public class ParameterUtilsTest { private static final String START_TIME_STRING = "12345"; private static final String END_TIME_STRING = "23456"; private static final String REPLICATION_THROTTLE_STRING = "1000"; + private static final String LOG_DIR_THROTTLE_STRING = "10000"; private static final String DEFAULT_REPLICATION_THROTTLE_STRING = "2000"; private static final String EXECUTION_PROGRESS_CHECK_INTERVAL_STRING = "1500"; @@ -138,6 +139,43 @@ public void testParseReplicationThrottleWithDefault() { EasyMock.verify(mockRequest, controlConfig); } + @Test + public void testParseLogDirThrottleWithNoDefault() { + CruiseControlRequestContext mockRequest = EasyMock.mock(CruiseControlRequestContext.class); + KafkaCruiseControlConfig controlConfig = EasyMock.mock(KafkaCruiseControlConfig.class); + + Map paramMap = Collections.singletonMap( + ParameterUtils.LOG_DIR_THROTTLE_PARAM, + new String[]{ParameterUtils.LOG_DIR_THROTTLE_PARAM}); + + EasyMock.expect(mockRequest.getParameterMap()).andReturn(paramMap).once(); + EasyMock.expect(mockRequest.getParameter(ParameterUtils.LOG_DIR_THROTTLE_PARAM)).andReturn(LOG_DIR_THROTTLE_STRING).once(); + // No default + EasyMock.expect(controlConfig.getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG)).andReturn(null); + + EasyMock.replay(mockRequest, controlConfig); + + Long replicationThrottle = ParameterUtils.logDirThrottle(mockRequest, controlConfig); + Assert.assertEquals(Long.valueOf(LOG_DIR_THROTTLE_STRING), replicationThrottle); + EasyMock.verify(mockRequest, controlConfig); + } + + @Test + public void testParseLogDirThrottleWithDefault() { + CruiseControlRequestContext mockRequest = EasyMock.mock(CruiseControlRequestContext.class); + KafkaCruiseControlConfig controlConfig = EasyMock.mock(KafkaCruiseControlConfig.class); + // No parameter string value in the parameter map + EasyMock.expect(mockRequest.getParameterMap()).andReturn(Collections.emptyMap()).once(); + EasyMock.expect(controlConfig.getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG)) + .andReturn(Long.valueOf(DEFAULT_REPLICATION_THROTTLE_STRING)); + + EasyMock.replay(mockRequest, controlConfig); + + Long replicationThrottle = ParameterUtils.replicationThrottle(mockRequest, controlConfig); + Assert.assertEquals(Long.valueOf(DEFAULT_REPLICATION_THROTTLE_STRING), replicationThrottle); + EasyMock.verify(mockRequest, controlConfig); + } + @Test public void testParseExecutionProgressCheckIntervalMsNoValue() { CruiseControlRequestContext mockRequest = EasyMock.mock(CruiseControlRequestContext.class); From 69a82c0328186cdbc4b7d786d2c33b06325a6601 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 21 Jun 2024 08:50:07 +0900 Subject: [PATCH 06/13] fix: #1851 Separate (un)setting throttling for inter/intra-broker rebalanacing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename: ReplicationThrottleHelper.clearThrottles → clearInterBrokerThrottles - Add ReplicationThrottleHelper.clearIntraBrokerThrottles - Executor.intraBrokerMoveReplicas now calls ReplicationThrottleHelper.setLogDirThrottles, ReplicationThrottleHelper.clearIntraBrokerThrottles --- .../cruisecontrol/executor/Executor.java | 24 ++++++++++---- .../executor/ReplicationThrottleHelper.java | 30 ++++++++++++++--- .../ReplicationThrottleHelperTest.java | 32 ++++++++++++------- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index a74ad691e3..890f19d248 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -7,6 +7,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.AtomicDouble; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache; @@ -1634,10 +1635,6 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), _replicationThrottle); } - if (_logDirThrottle != null) { - throttleHelper.setLogDirThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), - _logDirThrottle); - } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); @@ -1660,8 +1657,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - if (_replicationThrottle != null || _logDirThrottle != null) { - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + if (_replicationThrottle != null) { + throttleHelper.clearInterBrokerThrottles(completedTasks, inProgressTasks); } } @@ -1693,13 +1690,15 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc } } - private void intraBrokerMoveReplicas() { + private void intraBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient); int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; + Set participatingBrokers = Sets.newHashSet(); // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. @@ -1707,6 +1706,12 @@ private void intraBrokerMoveReplicas() { LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); if (!tasksToExecute.isEmpty()) { + if (_logDirThrottle != null) { + participatingBrokers = throttleHelper.setLogDirThrottles( + tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()), + _logDirThrottle + ); + } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); executeIntraBrokerReplicaMovements(tasksToExecute, _adminClient, _executionTaskManager, _config); @@ -1732,6 +1737,11 @@ private void intraBrokerMoveReplicas() { waitForIntraBrokerReplicaTasksToFinish(); inExecutionTasks = inExecutionTasks(); } + + if (_logDirThrottle != null) { + throttleHelper.clearIntraBrokerThrottles(participatingBrokers); + } + if (inExecutionTasks().isEmpty()) { LOG.info("User task {}: Intra-broker partition movements finished.", _uuid); } else if (_stopSignal.get() != NO_STOP_EXECUTION) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index cb6b21cde4..9f0d81efed 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -83,13 +83,15 @@ void setReplicationThrottles(List replicaMovementProposals, l } } - void setLogDirThrottles(List replicaMovementProposals, long throttleRate) - throws ExecutionException, InterruptedException, TimeoutException { + Set setLogDirThrottles(List replicaMovementProposals, long throttleRate) + throws ExecutionException, InterruptedException, TimeoutException { LOG.info("Setting a log dir throttle of {} bytes/sec", throttleRate); Set participatingBrokers = getParticipatingBrokers(replicaMovementProposals); for (Integer broker: participatingBrokers) { setLogDirThrottledRateIfNecessary(broker, throttleRate); } + + return participatingBrokers; } // Determines if a candidate task is ready to have its throttles removed. @@ -110,7 +112,7 @@ boolean taskIsInProgress(ExecutionTask task) { } // clear throttles for a specific list of execution tasks - void clearThrottles(List completedTasks, List inProgressTasks) + void clearInterBrokerThrottles(List completedTasks, List inProgressTasks) throws ExecutionException, InterruptedException, TimeoutException { List completedProposals = completedTasks @@ -151,6 +153,13 @@ void clearThrottles(List completedTasks, List inPr } } + void clearIntraBrokerThrottles(Set participatingBrokers) + throws ExecutionException, InterruptedException, TimeoutException { + for (int broker : participatingBrokers) { + removeLogDirThrottledRateFromBroker(broker); + } + } + private Set getParticipatingBrokers(List replicaMovementProposals) { Set participatingBrokers = new TreeSet<>(); for (ExecutionProposal proposal : replicaMovementProposals) { @@ -345,7 +354,6 @@ private void removeThrottledRatesFromBroker(Integer brokerId) Config brokerConfigs = getBrokerConfigs(brokerId); ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE); ConfigEntry currFollowerThrottle = brokerConfigs.get(FOLLOWER_THROTTLED_RATE); - ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); List ops = new ArrayList<>(); if (currLeaderThrottle != null) { if (currLeaderThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { @@ -363,6 +371,20 @@ private void removeThrottledRatesFromBroker(Integer brokerId) ops.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE)); } } + if (!ops.isEmpty()) { + changeBrokerConfigs(brokerId, ops); + } + } + + /** + * Remove {@value #LOG_DIR_THROTTLED_RATE} on broker {@param brokerId} + * + * @param brokerId broker to remove {@value #LOG_DIR_THROTTLED_RATE} + */ + private void removeLogDirThrottledRateFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException { + Config brokerConfigs = getBrokerConfigs(brokerId); + ConfigEntry currLogDirThrottle = brokerConfigs.get(LOG_DIR_THROTTLED_RATE); + List ops = new ArrayList<>(); if (currLogDirThrottle != null) { if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) { LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java index 49fb2de4de..8e7ddf6de8 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.java @@ -4,6 +4,7 @@ package com.linkedin.kafka.cruisecontrol.executor; +import com.google.common.collect.Sets; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness; import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo; @@ -146,7 +147,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { ExecutionTask mockCompleteTask = prepareMockCompleteTask(proposal); EasyMock.replay(mockAdminClient); - throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); EasyMock.verify(mockAdminClient, mockCompleteTask); // Case 2: a situation where Topic0 gets deleted after its configs were read. @@ -165,7 +166,7 @@ public void testClearThrottleOnNonExistentTopic() throws Exception { EasyMock.replay(mockAdminClient); // Expect no exception - throttleHelper.clearThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(mockCompleteTask), Collections.emptyList()); EasyMock.verify(mockAdminClient, mockCompleteTask); } @@ -242,13 +243,20 @@ public void testAddingThrottlesWithNoPreExistingThrottles() throws Exception { assertExpectedLogDirThrottledRateForBroker(1, throttleRate); assertExpectedLogDirThrottledRateForBroker(2, throttleRate); - // We expect all throttles to be cleaned up - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + // We expect all inter-broker throttles to be cleaned up (not intra-broker throttles) + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); } assertExpectedThrottledReplicas(TOPIC0, ""); + + assertExpectedLogDirThrottledRateForBroker(0, throttleRate); + assertExpectedLogDirThrottledRateForBroker(1, throttleRate); + assertExpectedLogDirThrottledRateForBroker(2, throttleRate); + + // We expect all intra-broker throttles to be cleaned up + throttleHelper.clearIntraBrokerThrottles(Sets.newHashSet(0, 1, 2)); for (int i = 0; i < clusterSize(); i++) { assertExpectedLogDirThrottledRateForBroker(i, null); } @@ -308,12 +316,12 @@ public void testAddingThrottlesWithPreExistingThrottles() throws Exception { // Existing throttled replicas are unchanged for topic 1: assertExpectedThrottledReplicas(TOPIC1, "1:1"); - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); // We expect all throttles related to replica movement to be removed. Specifically, // any throttles related to partitions which were not moved will remain. // However, we do expect the broker throttles to be removed. - throttleHelper.clearThrottles(Collections.singletonList(task), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(task), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); } @@ -357,7 +365,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { assertExpectedThrottledReplicas(TOPIC0, ReplicationThrottleHelper.WILDCARD_ASTERISK); assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -369,7 +377,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { assertExpectedThrottledReplicas(TOPIC1, ReplicationThrottleHelper.WILDCARD_ASTERISK); // passing an inProgress task that is not complete should have no effect. - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -382,7 +390,7 @@ public void testDoNotModifyExistingWildcardReplicaThrottles() throws Exception { // Completing the in-progress task and the "*" should not be cleaned up. inProgressTask.completed(3); - throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); @@ -422,7 +430,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { assertExpectedReplicationThrottledRateForBroker(3, throttleRate); assertExpectedThrottledReplicas(TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3"); - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -432,7 +440,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { assertExpectedThrottledReplicas(TOPIC0, "1:0,1:2,1:3"); // passing an inProgress task that is not complete should have no effect. - throttleHelper.clearThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); + throttleHelper.clearInterBrokerThrottles(Collections.singletonList(completedTask), Collections.singletonList(inProgressTask)); assertExpectedReplicationThrottledRateForBroker(0, throttleRate); // we expect broker 1 to be null since all replica movement related to it has completed. assertExpectedReplicationThrottledRateForBroker(1, null); @@ -443,7 +451,7 @@ public void testDoNotRemoveThrottlesForInProgressTasks() throws Exception { // Completing the in-progress task and clearing the throttles should clean everything up. inProgressTask.completed(3); - throttleHelper.clearThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); + throttleHelper.clearInterBrokerThrottles(Arrays.asList(completedTask, inProgressTask), Collections.emptyList()); for (int i = 0; i < clusterSize(); i++) { assertExpectedReplicationThrottledRateForBroker(i, null); From bea9d53caa14a66e9b7ca8f874c90031ffb04703 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 23 Feb 2025 22:24:24 +0900 Subject: [PATCH 07/13] Add 'default.log.dir.throttle' to Configurations.md --- docs/wiki/User Guide/Configurations.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index 51192f20f3..a2143caaaf 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -155,7 +155,8 @@ The following configurations are inherited from the open source Kafka client con | execution.progress.check.interval.ms | Integer | N | 10,000 | The interval in milliseconds that the " +,"executor will check on the execution progress. | | metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | | topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | -| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | +| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | +| default.log.dir.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | | replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | | default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | | executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | From 6b168bfefac0c9fd5c02f838cf3db5c394e13bc8 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 23 Feb 2025 22:25:12 +0900 Subject: [PATCH 08/13] Remove logDirThrottle parameter in Executor#executeDemoteProposals --- .../com/linkedin/kafka/cruisecontrol/executor/Executor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 890f19d248..d447f07af9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -928,7 +928,7 @@ public synchronized void executeDemoteProposals(Collection pr initProposalExecution(proposals, demotedBrokers, concurrentSwaps, null, 0, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); - startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, replicationThrottle, isTriggeredByUserRequest); + startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, null, isTriggeredByUserRequest); } catch (Exception e) { processExecuteProposalsFailure(); throw e; From 76ca5f7058470537110de9c4fb5457506b6f3f2e Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sun, 23 Feb 2025 22:35:47 +0900 Subject: [PATCH 09/13] =?UTF-8?q?Revert=20method=20name:=20removeThrottled?= =?UTF-8?q?RatesFromBroker=20=E2=86=92=20removeThrottledRateFromBroker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cruisecontrol/executor/ReplicationThrottleHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java index 9f0d81efed..8caec52ef0 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java @@ -144,7 +144,7 @@ void clearInterBrokerThrottles(List completedTasks, List> throttledReplicas = getThrottledReplicasByTopic(completedProposals); @@ -349,7 +349,7 @@ private void removeThrottledReplicasFromTopic(String topic, Set replicas } } - private void removeThrottledRatesFromBroker(Integer brokerId) + private void removeThrottledRateFromBroker(Integer brokerId) throws ExecutionException, InterruptedException, TimeoutException { Config brokerConfigs = getBrokerConfigs(brokerId); ConfigEntry currLeaderThrottle = brokerConfigs.get(LEADER_THROTTLED_RATE); From 411fbbedf64c6d7acf5927be08c7aa842b933225 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 2 Apr 2025 23:45:40 +0900 Subject: [PATCH 10/13] Fix documentation on the 'default.log.dir.throttle' configuration. --- docs/wiki/User Guide/Configurations.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index a2143caaaf..70e0442395 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -155,8 +155,8 @@ The following configurations are inherited from the open source Kafka client con | execution.progress.check.interval.ms | Integer | N | 10,000 | The interval in milliseconds that the " +,"executor will check on the execution progress. | | metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | | topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | -| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | -| default.log.dir.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | +| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved between the brokers (i.e., inter-broker rebalancing), in bytes per second. | +| default.log.dir.throttle | Long | N | null | The replication throttle applied to replicas being moved between the log dirs (i.e., intra-broker rebalancing), in bytes per second. | | replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | | default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | | executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | From 685865da1181c2d8fe296473db2a469044a983d3 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 2 Apr 2025 23:47:12 +0900 Subject: [PATCH 11/13] Improve Javadoc of KafkaCruiseControl#executeProposals (i.e., inter-broker movements vs. intra-broker movements) --- .../linkedin/kafka/cruisecontrol/KafkaCruiseControl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index 6d7025c087..8e8e50c5ef 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -650,9 +650,11 @@ private static boolean hasProposalsToExecute(Collection propo * @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks * (if null, use default.replica.movement.strategies). * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers - * when executing proposals (if null, no throttling is applied). + * when executing proposals; That is, this parameter only affects inter-broker replica + * movements (if null, no throttling is applied). * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs - * when executing proposals (if null, no throttling is applied). + * when executing proposals; That is, this parameter only affects intra-broker replica + * movements (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. * @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker From fda2ab06dc601fce6f16ed97ec497ebf4e14743c Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Wed, 2 Apr 2025 23:47:47 +0900 Subject: [PATCH 12/13] Remove logDirThrottle parameter from KafkaCruiseControl#executeRemoval --- .../com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java | 5 +---- .../handler/async/runnable/RemoveBrokersRunnable.java | 2 +- .../kafka/cruisecontrol/detector/MaintenanceEventTest.java | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index 8e8e50c5ef..68cbac2c66 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -705,8 +705,6 @@ public void executeProposals(Set proposals, * (if null, use default.replica.movement.strategies). * @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers * when executing remove operations (if null, no throttling is applied). - * @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs - * when executing remove operations (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. */ @@ -721,14 +719,13 @@ public void executeRemoval(Set proposals, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, - Long logDirThrottle, boolean isTriggeredByUserRequest, String uuid) throws OngoingExecutionException { if (hasProposalsToExecute(proposals, uuid)) { _executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers, _loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, 0, clusterLeaderMovementConcurrency, brokerLeaderMovementConcurrency, - executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, logDirThrottle, + executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, null, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false); } else { failGeneratingProposalsForExecution(uuid); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 7abf42ed40..3ef80a09c7 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -158,7 +158,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, - _logDirThrottle, _isTriggeredByUserRequest, _uuid); + _isTriggeredByUserRequest, _uuid); } return result; diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java index 32e2694a14..0e452b9493 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java @@ -213,7 +213,6 @@ public void testRemoveBrokerEvent() EasyMock.eq(SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS), EasyMock.eq(SELF_HEALING_REPLICA_MOVEMENT_STRATEGY), EasyMock.eq(null), - EasyMock.eq(null), EasyMock.eq(false), EasyMock.anyString()); EasyMock.expect(_optimizerResult.getProposalSummaryForJson()).andReturn(Collections.emptyMap()); From 02057a3154f160ac5f760d3b0e4a5c1804e0e70c Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 3 Apr 2025 00:28:09 +0900 Subject: [PATCH 13/13] Remove _logDirThrottle from AddBrokersRunnable, RemoveBrokersRunnable, and UpdateTopicConfigurationRunnable. (reason: they do not incur any intra-broker replica movements) --- .../servlet/handler/async/runnable/AddBrokersRunnable.java | 5 +---- .../handler/async/runnable/RemoveBrokersRunnable.java | 3 --- .../async/runnable/UpdateTopicConfigurationRunnable.java | 6 +----- .../kafka/cruisecontrol/detector/MaintenanceEventTest.java | 6 +++--- 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java index 9617091cf8..28ea70404a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/AddBrokersRunnable.java @@ -42,7 +42,6 @@ public class AddBrokersRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; - protected final Long _logDirThrottle; protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = false; /** @@ -68,7 +67,6 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); - _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, @@ -86,7 +84,6 @@ public AddBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); - _logDirThrottle = parameters.logDirThrottle(); } @Override @@ -130,7 +127,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, - _logDirThrottle, + null, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 3ef80a09c7..16e05b96e5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -49,7 +49,6 @@ public class RemoveBrokersRunnable extends GoalBasedOperationRunnable { protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; - protected final Long _logDirThrottle; /** * Constructor to be used for creating a runnable for self-healing. @@ -75,7 +74,6 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); - _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, @@ -94,7 +92,6 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = parameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = parameters.replicaMovementStrategy(); _replicationThrottle = parameters.replicationThrottle(); - _logDirThrottle = parameters.logDirThrottle(); } @Override diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java index dabd5dab4e..3fdd356981 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/UpdateTopicConfigurationRunnable.java @@ -80,7 +80,6 @@ public class UpdateTopicConfigurationRunnable extends GoalBasedOperationRunnable protected final Long _executionProgressCheckIntervalMs; protected final ReplicaMovementStrategy _replicaMovementStrategy; protected final Long _replicationThrottle; - protected final Long _logDirThrottle; protected Cluster _cluster; protected Map> _topicsToChangeByReplicationFactor; protected static final boolean SKIP_AUTO_REFRESHING_CONCURRENCY = true; @@ -108,7 +107,6 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = topicReplicationFactorChangeParameters.executionProgressCheckIntervalMs(); _replicaMovementStrategy = topicReplicationFactorChangeParameters.replicaMovementStrategy(); _replicationThrottle = topicReplicationFactorChangeParameters.replicationThrottle(); - _logDirThrottle = topicReplicationFactorChangeParameters.logDirThrottle(); } else { _topicPatternByReplicationFactor = null; _skipRackAwarenessCheck = false; @@ -119,7 +117,6 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = null; _replicaMovementStrategy = null; _replicationThrottle = null; - _logDirThrottle = null; } } @@ -152,7 +149,6 @@ public UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl, _executionProgressCheckIntervalMs = SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS; _replicaMovementStrategy = SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; _replicationThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_REPLICATION_THROTTLE_CONFIG); - _logDirThrottle = kafkaCruiseControl.config().getLong(ExecutorConfig.DEFAULT_LOG_DIR_THROTTLE_CONFIG); } @Override @@ -213,7 +209,7 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _kafkaCruiseControl.executeProposals(result.goalProposals(), Collections.emptySet(), false, _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, 0, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, - _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, _logDirThrottle, + _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, null, _isTriggeredByUserRequest, _uuid, SKIP_AUTO_REFRESHING_CONCURRENCY); } return result; diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java index 0e452b9493..6ee3747dce 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java @@ -101,7 +101,7 @@ public void testAddBrokerEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -176,7 +176,7 @@ public void testRemoveBrokerEvent() ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, MOCK_TIME_MS); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn( @@ -480,7 +480,7 @@ public void testTopicReplicationFactorEvent() TOPICS_WITH_RF_UPDATE_CONFIG, MOCK_TOPICS_WITH_RF_UPDATE); // Expect mocks. - EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4); + EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(3); _mockKafkaCruiseControl.sanityCheckDryRun(false, true); EasyMock.expect(_mockKafkaCruiseControl.hasOngoingExecution()).andReturn(false).once(); EasyMock.expect(_mockKafkaCruiseControl.modelCompletenessRequirements(EasyMock.anyObject())).andReturn(