From 62ef9e49f09c4b3ef42634724e249f013bf4a6f4 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Sun, 17 Aug 2025 15:19:01 +0900 Subject: [PATCH 1/9] Add bulk replication throttle mode (set once before inter-broker, clear once after) --- .../config/constants/ExecutorConfig.java | 16 +- .../cruisecontrol/executor/Executor.java | 67 +++++- .../cruisecontrol/executor/ExecutorTest.java | 203 ++++++++++++++++++ 3 files changed, 281 insertions(+), 5 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 99a2b4cd87..5bf3d9f08b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -610,6 +610,15 @@ public final class ExecutorConfig { public static final String AUTO_STOP_EXTERNAL_AGENT_DOC = "When starting a new proposal execution while external agent is reassigning partitions," + " automatically stop the external agent and start the execution." + " Set to false to keep the external agent reassignment and skip starting the execution."; + + /** + * bulk.replication.throttle.enabled + */ + public static final String BULK_REPLICATION_THROTTLE_ENABLED_CONFIG = "bulk.replication.throttle.enabled"; + public static final boolean DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED = false; + public static final String BULK_REPLICATION_THROTTLE_ENABLED_DOC = "If true, Cruise Control sets replication throttles once " + + "before starting inter-broker replica movements and clears them once after inter-broker replica movements end. " + + "If false, throttles are set/cleared per batch."; private ExecutorConfig() { } @@ -990,6 +999,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_AUTO_STOP_EXTERNAL_AGENT, ConfigDef.Importance.MEDIUM, - AUTO_STOP_EXTERNAL_AGENT_DOC); + AUTO_STOP_EXTERNAL_AGENT_DOC) + .define(BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED, + ConfigDef.Importance.MEDIUM, + BULK_REPLICATION_THROTTLE_ENABLED_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 22f210f0a5..9ef4ba3fab 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1606,13 +1606,33 @@ private void updateOngoingExecutionState() { private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { Set currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, - currentDeadBrokersWithReplicas); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, currentDeadBrokersWithReplicas); + final boolean bulkThrottleEnabled = _config.getBoolean(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG); int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); + // If bulk throttle is enabled, set throttles once for all pending inter-broker tasks before entering the loop. + if (bulkThrottleEnabled && _replicationThrottle != null && numTotalPartitionMovements > 0) { + ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtStart.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + Set pendingAtStart = interBrokerTasksByState.getOrDefault(ExecutionTaskState.PENDING, + Collections.emptySet()); + // Filter out proposals for non-existent topics to avoid admin timeouts on config changes for non-existent topics. + Set existingTopics = _metadataClient.refreshMetadata().cluster().topics(); + List proposalsForExistingTopics = pendingAtStart.stream() + .map(ExecutionTask::proposal) + .filter(p -> existingTopics.contains(p.topic())) + .collect(Collectors.toList()); + long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count(); + LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " + + "({} pending before filtering) across {} topics.", + _uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle); + throttleHelper.setThrottles(proposalsForExistingTopics); + } int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { @@ -1622,7 +1642,9 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + if (!bulkThrottleEnabled) { + throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); @@ -1644,7 +1666,44 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) .collect(Collectors.toList()); inProgressTasks.addAll(inExecutionTasks()); - + if (!bulkThrottleEnabled) { + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + } + } + if (bulkThrottleEnabled && _replicationThrottle != null) { + ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtEnd.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + List completedTasks = new ArrayList<>(); + if (interBrokerTasksByState != null) { + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); + } + List inProgressTasks = new ArrayList<>(); + if (interBrokerTasksByState != null) { + inProgressTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); + inProgressTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); + } + int completedCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(); + int abortedCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(); + int deadCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(); + int inProgressCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(); + int abortingCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size(); + LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " + + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", + _uuid, _replicationThrottle, completedCount, abortedCount, deadCount, inProgressCount, abortingCount); throttleHelper.clearThrottles(completedTasks, inProgressTasks); } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 29cbb4ac0d..47ab6f5626 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -35,6 +35,7 @@ import java.util.Random; import java.util.Set; import java.util.HashSet; +import java.util.stream.Collectors; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -42,6 +43,8 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ElectLeadersResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; @@ -52,6 +55,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -422,6 +426,205 @@ public void testSetExecutionProgressCheckIntervalMsWithNoRequestedValue() { assertEquals(validValue, executor.executionProgressCheckIntervalMs()); } + @Test + public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplied() + throws InterruptedException, OngoingExecutionException { + // Prepare topics and proposals + Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); + int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); + int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); + + ExecutionProposal proposal0 = + new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); + ExecutionProposal proposal1 = + new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); + + Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); + + // Enable bulk throttle in config + Properties props = getExecutorProperties(); + props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); + KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); + + UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); + UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); + LoadMonitor mockLoadMonitor = getMockLoadMonitor(); + AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); + EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); + + MetricRegistry metricRegistry = new MetricRegistry(); + Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); + executor.setUserTaskManager(mockUserTaskManager); + + // Start execution with replication throttle + final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; + executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); + executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, + null, null, replicationThrottle, true, RANDOM_UUID, false, false); + + // Wait for inter-broker movement to start + waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), + "Inter-broker replica movement task did not start within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_SHORT_CHECK_MS); + + try (AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient( + Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker(0).plaintextAddr()))) { + + waitUntilTrue(() -> { + try { + for (int brokerId : List.of(0, 1)) { + ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); + Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); + Config brokerCfg = cfg.get(cf); + String leaderActual = brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE) == null + ? null : brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE).value(); + String followerActual = brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE) == null + ? null : brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE).value(); + if (!String.valueOf(replicationThrottle).equals(leaderActual) + || !String.valueOf(replicationThrottle).equals(followerActual)) { + return false; + } + } + return true; + } catch (Exception e) { + return false; + } + }, + "Throttle rate not applied to brokers in time", + EXECUTION_DEADLINE_MS, + EXECUTION_SHORT_CHECK_MS); + + Set topicsSnapshot = proposalsToExecute.stream().map(ExecutionProposal::topic).collect(Collectors.toSet()); + for (String topic : topicsSnapshot) { + waitUntilTrue(() -> { + try { + ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); + Config topicCfg = cfg.get(cf); + String leaderReplicas = topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS) == null + ? null : topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS).value(); + String followerReplicas = topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS) == null + ? null : topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS).value(); + return leaderReplicas != null && !leaderReplicas.isEmpty() && leaderReplicas.contains("0:") + && followerReplicas != null && !followerReplicas.isEmpty() && followerReplicas.contains("0:"); + } catch (Exception e) { + return false; + } + }, + "Topic throttled replicas not applied for topic " + topic, + EXECUTION_DEADLINE_MS, + EXECUTION_SHORT_CHECK_MS); + } + + // Explicitly stop to avoid long wait, then ensure background execution is drained + executor.userTriggeredStopExecution(false); + waitUntilTrue(() -> (!executor.hasOngoingExecution() && executor.state().state() == ExecutorState.State.NO_TASK_IN_PROGRESS), + "Stopped proposal execution did not finish within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_REGULAR_CHECK_MS); + } + } + + @Test + public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleClearedAtEnd() + throws InterruptedException, OngoingExecutionException { + // Prepare topics and proposals + Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); + int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); + int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); + + ExecutionProposal proposal0 = + new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); + ExecutionProposal proposal1 = + new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); + + Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); + + // Enable bulk throttle in config + Properties props = getExecutorProperties(); + props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); + KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); + + UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); + UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); + LoadMonitor mockLoadMonitor = getMockLoadMonitor(); + AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); + EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); + + MetricRegistry metricRegistry = new MetricRegistry(); + Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); + executor.setUserTaskManager(mockUserTaskManager); + + // Start execution with replication throttle + final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; + executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); + executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, + null, null, replicationThrottle, true, RANDOM_UUID, false, false); + + // Ensure execution started, confirm reassignments began, then stop to expedite teardown and wait for completion + waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), + "Inter-broker replica movement task did not start within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_SHORT_CHECK_MS); + verifyOngoingPartitionReassignments(Collections.singleton(TP0)); + executor.userTriggeredStopExecution(false); + waitUntilTrue(() -> (!executor.hasOngoingExecution() && executor.state().state() == ExecutorState.State.NO_TASK_IN_PROGRESS), + "Stopped proposal execution did not finish within the time limit", EXECUTION_DEADLINE_MS * 3, EXECUTION_REGULAR_CHECK_MS); + + // Verify throttles are cleared at the end + try (AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient( + Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker(0).plaintextAddr()))) { + + waitUntilTrue(() -> { + try { + for (int brokerId : List.of(0, 1)) { + ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); + Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); + Config brokerCfg = cfg.get(cf); + String leaderActual = brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE) == null + ? null : brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE).value(); + String followerActual = brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE) == null + ? null : brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE).value(); + if (leaderActual != null || followerActual != null) { + return false; + } + } + return true; + } catch (Exception e) { + return false; + } + }, + "Throttle not cleared from brokers in time", + EXECUTION_DEADLINE_MS, + EXECUTION_SHORT_CHECK_MS); + + // Topic replicas cleared + Set topicsSnapshot = proposalsToExecute.stream().map(ExecutionProposal::topic).collect(Collectors.toSet()); + for (String topic : topicsSnapshot) { + waitUntilTrue(() -> { + try { + ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); + Config topicCfg = cfg.get(cf); + ConfigEntry leaderEntry = topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS); + ConfigEntry followerEntry = topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS); + boolean leaderCleared = leaderEntry == null || leaderEntry.value() == null || leaderEntry.value().isEmpty(); + boolean followerCleared = followerEntry == null || followerEntry.value() == null || followerEntry.value().isEmpty(); + return leaderCleared && followerCleared; + } catch (Exception e) { + return false; + } + }, + "Topic throttled replicas not cleared for topic " + topic, + EXECUTION_DEADLINE_MS, + EXECUTION_SHORT_CHECK_MS); + } + } + } + @Test public void testResetExecutionProgressCheckIntervalMs() { KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties()); From bf1451473dd7b49b12ae9d393fca9b5cc8218233 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Sun, 24 Aug 2025 14:03:50 +0900 Subject: [PATCH 2/9] Apply try-catch to recovery failure case --- .../cruisecontrol/executor/Executor.java | 194 +++++++++--------- 1 file changed, 100 insertions(+), 94 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 9ef4ba3fab..b73496aa15 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1612,106 +1612,112 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); - - // If bulk throttle is enabled, set throttles once for all pending inter-broker tasks before entering the loop. - if (bulkThrottleEnabled && _replicationThrottle != null && numTotalPartitionMovements > 0) { - ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary( - Collections.singleton(INTER_BROKER_REPLICA_ACTION)); - Map> interBrokerTasksByState = summaryAtStart.filteredTasksByState() - .get(INTER_BROKER_REPLICA_ACTION); - Set pendingAtStart = interBrokerTasksByState.getOrDefault(ExecutionTaskState.PENDING, - Collections.emptySet()); - // Filter out proposals for non-existent topics to avoid admin timeouts on config changes for non-existent topics. - Set existingTopics = _metadataClient.refreshMetadata().cluster().topics(); - List proposalsForExistingTopics = pendingAtStart.stream() - .map(ExecutionTask::proposal) - .filter(p -> existingTopics.contains(p.topic())) - .collect(Collectors.toList()); - long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count(); - LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " - + "({} pending before filtering) across {} topics.", - _uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle); - throttleHelper.setThrottles(proposalsForExistingTopics); - } - int partitionsToMove = numTotalPartitionMovements; - // Exhaust all the pending partition movements. - while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { - // Get tasks to execute. - List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); - LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); - - AlterPartitionReassignmentsResult result = null; - if (!tasksToExecute.isEmpty()) { + try { + // If bulk throttle is enabled, set throttles once for all pending inter-broker tasks before entering the loop. + if (bulkThrottleEnabled && _replicationThrottle != null && numTotalPartitionMovements > 0) { + ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtStart.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + Set pendingAtStart = interBrokerTasksByState.getOrDefault(ExecutionTaskState.PENDING, + Collections.emptySet()); + // Filter out proposals for non-existent topics to avoid admin timeouts on config changes for non-existent topics. + Set existingTopics = _metadataClient.refreshMetadata().cluster().topics(); + List proposalsForExistingTopics = pendingAtStart.stream() + .map(ExecutionTask::proposal) + .filter(p -> existingTopics.contains(p.topic())) + .collect(Collectors.toList()); + long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count(); + LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " + + "({} pending before filtering) across {} topics.", + _uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle); + throttleHelper.setThrottles(proposalsForExistingTopics); + } + int partitionsToMove = numTotalPartitionMovements; + // Exhaust all the pending partition movements. + while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { + // Get tasks to execute. + List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); + + AlterPartitionReassignmentsResult result = null; + if (!tasksToExecute.isEmpty()) { + if (!bulkThrottleEnabled) { + throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + } + // Execute the tasks. + _executionTaskManager.markTasksInProgress(tasksToExecute); + result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); + } + // Wait indefinitely for partition movements to finish. + List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); + partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); + int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); + long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); + updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); + LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", + _uuid, + numFinishedPartitionMovements, numTotalPartitionMovements, + String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), + finishedDataMovementInMB, totalDataToMoveInMB, + totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE + / totalDataToMoveInMB)); + List inProgressTasks = tasksToExecute.stream() + .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) + .collect(Collectors.toList()); + inProgressTasks.addAll(inExecutionTasks()); if (!bulkThrottleEnabled) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); + throttleHelper.clearThrottles(completedTasks, inProgressTasks); } - // Execute the tasks. - _executionTaskManager.markTasksInProgress(tasksToExecute); - result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); - } - // Wait indefinitely for partition movements to finish. - List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); - partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); - int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); - long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); - updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); - LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", - _uuid, - numFinishedPartitionMovements, numTotalPartitionMovements, - String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), - finishedDataMovementInMB, totalDataToMoveInMB, - totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE - / totalDataToMoveInMB)); - List inProgressTasks = tasksToExecute.stream() - .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) - .collect(Collectors.toList()); - inProgressTasks.addAll(inExecutionTasks()); - if (!bulkThrottleEnabled) { - throttleHelper.clearThrottles(completedTasks, inProgressTasks); } - } - if (bulkThrottleEnabled && _replicationThrottle != null) { - ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary( - Collections.singleton(INTER_BROKER_REPLICA_ACTION)); - Map> interBrokerTasksByState = summaryAtEnd.filteredTasksByState() - .get(INTER_BROKER_REPLICA_ACTION); - List completedTasks = new ArrayList<>(); - if (interBrokerTasksByState != null) { - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); - } - List inProgressTasks = new ArrayList<>(); - if (interBrokerTasksByState != null) { - inProgressTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); - inProgressTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); + } finally { + // Ensure bulk throttles are cleared even if an exception occurs during inter-broker movements. + if (bulkThrottleEnabled && _replicationThrottle != null) { + try { + ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtEnd.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + List completedTasks = new ArrayList<>(); + if (interBrokerTasksByState != null) { + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); + completedTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); + } + List inProgressTasks = new ArrayList<>(); + if (interBrokerTasksByState != null) { + inProgressTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); + inProgressTasks.addAll( + interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); + } + int completedCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(); + int abortedCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(); + int deadCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(); + int inProgressCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(); + int abortingCount = interBrokerTasksByState == null ? 0 + : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size(); + LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " + + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", + _uuid, _replicationThrottle, completedCount, abortedCount, deadCount, inProgressCount, abortingCount); + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e); + } } - int completedCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(); - int abortedCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(); - int deadCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(); - int inProgressCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(); - int abortingCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size(); - LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " - + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", - _uuid, _replicationThrottle, completedCount, abortedCount, deadCount, inProgressCount, abortingCount); - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not + // in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to + // have it been safely used in following leadership move tasks. + resetExecutionProgressCheckIntervalMs(); } - // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not - // in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to - // have it been safely used in following leadership move tasks. - resetExecutionProgressCheckIntervalMs(); - // At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead. if (_stopSignal.get() == NO_STOP_EXECUTION) { LOG.info("User task {}: Inter-broker partition movements finished", _uuid); From e8950c0c73964758b4f7df3e21acb7e032c27269 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Sun, 31 Aug 2025 15:18:44 +0900 Subject: [PATCH 3/9] fix spot but test --- .../kafka/cruisecontrol/executor/ExecutorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 47ab6f5626..f6c629c6b0 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -489,7 +489,7 @@ public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplie } } return true; - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { return false; } }, @@ -510,7 +510,7 @@ public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplie ? null : topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS).value(); return leaderReplicas != null && !leaderReplicas.isEmpty() && leaderReplicas.contains("0:") && followerReplicas != null && !followerReplicas.isEmpty() && followerReplicas.contains("0:"); - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { return false; } }, @@ -593,7 +593,7 @@ public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleCleare } } return true; - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { return false; } }, @@ -614,7 +614,7 @@ public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleCleare boolean leaderCleared = leaderEntry == null || leaderEntry.value() == null || leaderEntry.value().isEmpty(); boolean followerCleared = followerEntry == null || followerEntry.value() == null || followerEntry.value().isEmpty(); return leaderCleared && followerCleared; - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { return false; } }, From 422ec93922cb920c18de2d9a2d4cdafd5eee4650 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Wed, 17 Sep 2025 00:47:47 +0900 Subject: [PATCH 4/9] apply reviews --- .../config/constants/ExecutorConfig.java | 2 +- .../cruisecontrol/executor/Executor.java | 65 ++++++++++--------- docs/wiki/User Guide/Configurations.md | 1 + 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 5bf3d9f08b..db78b34fd1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -615,7 +615,7 @@ public final class ExecutorConfig { * bulk.replication.throttle.enabled */ public static final String BULK_REPLICATION_THROTTLE_ENABLED_CONFIG = "bulk.replication.throttle.enabled"; - public static final boolean DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED = false; + public static final boolean DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED = true; public static final String BULK_REPLICATION_THROTTLE_ENABLED_DOC = "If true, Cruise Control sets replication throttles once " + "before starting inter-broker replica movements and clears them once after inter-broker replica movements end. " + "If false, throttles are set/cleared per batch."; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index b73496aa15..7c8cae93bf 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1678,37 +1678,16 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc Collections.singleton(INTER_BROKER_REPLICA_ACTION)); Map> interBrokerTasksByState = summaryAtEnd.filteredTasksByState() .get(INTER_BROKER_REPLICA_ACTION); - List completedTasks = new ArrayList<>(); - if (interBrokerTasksByState != null) { - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); - completedTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); - } - List inProgressTasks = new ArrayList<>(); - if (interBrokerTasksByState != null) { - inProgressTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); - inProgressTasks.addAll( - interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); - } - int completedCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(); - int abortedCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(); - int deadCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(); - int inProgressCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(); - int abortingCount = interBrokerTasksByState == null ? 0 - : interBrokerTasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size(); - LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " - + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", - _uuid, _replicationThrottle, completedCount, abortedCount, deadCount, inProgressCount, abortingCount); + Map> tasksByState = + interBrokerTasksByState == null ? Collections.emptyMap() : interBrokerTasksByState; + logBulkThrottleCleanup(tasksByState); + List completedTasks = buildCompletedTasks(tasksByState); + List inProgressTasks = buildInProgressTasks(tasksByState); throttleHelper.clearThrottles(completedTasks, inProgressTasks); - } catch (ExecutionException | InterruptedException | TimeoutException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e); + } catch (ExecutionException | TimeoutException e) { LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e); } } @@ -1849,6 +1828,32 @@ private int moveLeadershipInBatch() { return numLeadershipToMove; } + private void logBulkThrottleCleanup(Map> tasksByState) { + LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " + + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", + _uuid, _replicationThrottle, + tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size()); + } + + private List buildCompletedTasks(Map> tasksByState) { + List completedTasks = new ArrayList<>(); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); + return completedTasks; + } + + private List buildInProgressTasks(Map> tasksByState) { + List inProgressTasks = new ArrayList<>(); + inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); + inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); + return inProgressTasks; + } + /** * First waits for {@link #executionProgressCheckIntervalMs} for the execution to make progress, then retrieves the * cluster state for the progress check. diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index 51192f20f3..846cc3647e 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -156,6 +156,7 @@ The following configurations are inherited from the open source Kafka client con | metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | | topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | | default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | +| bulk.replication.throttle.enabled | Boolean | N | true | If true, Cruise Control sets replication throttles once before starting inter-broker replica movements and clears them once after inter-broker replica movements end. If false, throttles are set/cleared per batch. Effective only when a replication throttle is configured via request parameter `replication_throttle` or config `default.replication.throttle`. | | replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | | default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | | executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | From 7c9bfa880b453b6ad9eeec1487844b3e6d31174f Mon Sep 17 00:00:00 2001 From: il-kyun Date: Fri, 19 Sep 2025 13:11:07 +0900 Subject: [PATCH 5/9] consolidate proposal and execution setup in ExecutorTest --- .../cruisecontrol/executor/ExecutorTest.java | 111 +++++++----------- 1 file changed, 43 insertions(+), 68 deletions(-) diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index f6c629c6b0..9a735c682c 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -429,42 +429,11 @@ public void testSetExecutionProgressCheckIntervalMsWithNoRequestedValue() { @Test public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplied() throws InterruptedException, OngoingExecutionException { - // Prepare topics and proposals - Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); - int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); - int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); - - ExecutionProposal proposal0 = - new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); - ExecutionProposal proposal1 = - new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); - - Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); - - // Enable bulk throttle in config - Properties props = getExecutorProperties(); - props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); - KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); - - UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); - UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); - LoadMonitor mockLoadMonitor = getMockLoadMonitor(); - AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); - EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); + Collection proposalsToExecute = createProposalsForTwoTopics(PRODUCE_SIZE_IN_BYTES); - MetricRegistry metricRegistry = new MetricRegistry(); - Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); - executor.setUserTaskManager(mockUserTaskManager); - - // Start execution with replication throttle + // Start execution with replication throttle using helper final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; - executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); - executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, - null, null, replicationThrottle, true, RANDOM_UUID, false, false); + Executor executor = startExecutionWithBulkThrottle(proposalsToExecute, replicationThrottle); // Wait for inter-broker movement to start waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), @@ -529,42 +498,11 @@ public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplie @Test public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleClearedAtEnd() throws InterruptedException, OngoingExecutionException { - // Prepare topics and proposals - Map topicDescriptions = createTopics((int) PRODUCE_SIZE_IN_BYTES); - int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); - int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); - - ExecutionProposal proposal0 = - new ExecutionProposal(TP0, PRODUCE_SIZE_IN_BYTES, new ReplicaPlacementInfo(initialLeader0), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); - ExecutionProposal proposal1 = - new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); - - Collection proposalsToExecute = Arrays.asList(proposal0, proposal1); + Collection proposalsToExecute = createProposalsForTwoTopics(PRODUCE_SIZE_IN_BYTES); - // Enable bulk throttle in config - Properties props = getExecutorProperties(); - props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); - KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); - - UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); - UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); - LoadMonitor mockLoadMonitor = getMockLoadMonitor(); - AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); - EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); - - MetricRegistry metricRegistry = new MetricRegistry(); - Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); - executor.setUserTaskManager(mockUserTaskManager); - - // Start execution with replication throttle + // Start execution with replication throttle using helper final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; - executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); - executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, - null, null, replicationThrottle, true, RANDOM_UUID, false, false); + Executor executor = startExecutionWithBulkThrottle(proposalsToExecute, replicationThrottle); // Ensure execution started, confirm reassignments began, then stop to expedite teardown and wait for completion waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), @@ -817,6 +755,43 @@ private void populateProposals(List proposalToExecute, proposalToVerify.addAll(Arrays.asList(proposal0, proposal1)); } + private Collection createProposalsForTwoTopics(long produceSizeInBytes) throws InterruptedException { + Map topicDescriptions = createTopics((int) produceSizeInBytes); + int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); + int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); + ExecutionProposal proposal0 = + new ExecutionProposal(TP0, produceSizeInBytes, new ReplicaPlacementInfo(initialLeader0), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), + Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); + ExecutionProposal proposal1 = + new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), + Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); + return Arrays.asList(proposal0, proposal1); + } + + private Executor startExecutionWithBulkThrottle(Collection proposalsToExecute, + long replicationThrottle) throws OngoingExecutionException { + Properties props = getExecutorProperties(); + props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); + KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); + + UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); + UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); + LoadMonitor mockLoadMonitor = getMockLoadMonitor(); + AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); + EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); + + MetricRegistry metricRegistry = new MetricRegistry(); + Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); + executor.setUserTaskManager(mockUserTaskManager); + + executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); + executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, + null, null, replicationThrottle, true, RANDOM_UUID, false, false); + return executor; + } + /** * Creates {@link com.linkedin.kafka.cruisecontrol.common.TestConstants#TOPIC0 topic0} with replication factor 1 and * {@link com.linkedin.kafka.cruisecontrol.common.TestConstants#TOPIC1 topic1} with replication factor 2. Waits until From 4ec2f896f54c0b7d24d7cca0baeee43d2aa14606 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Fri, 19 Sep 2025 23:12:44 +0900 Subject: [PATCH 6/9] change to rerun build --- .../com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 9a735c682c..27984da4fd 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -767,7 +767,7 @@ private Collection createProposalsForTwoTopics(long produceSi new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); - return Arrays.asList(proposal0, proposal1); + return List.of(proposal0, proposal1); } private Executor startExecutionWithBulkThrottle(Collection proposalsToExecute, From eb7d270543d95229afa45a05fd55f97bc7d86c67 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Wed, 15 Oct 2025 20:38:39 +0900 Subject: [PATCH 7/9] as the default logic --- .../config/constants/ExecutorConfig.java | 15 +- .../cruisecontrol/executor/Executor.java | 20 +- .../cruisecontrol/executor/ExecutorTest.java | 178 ------------------ docs/wiki/User Guide/Configurations.md | 1 - 4 files changed, 5 insertions(+), 209 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index db78b34fd1..4f833e615b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -611,14 +611,6 @@ public final class ExecutorConfig { + " automatically stop the external agent and start the execution." + " Set to false to keep the external agent reassignment and skip starting the execution."; - /** - * bulk.replication.throttle.enabled - */ - public static final String BULK_REPLICATION_THROTTLE_ENABLED_CONFIG = "bulk.replication.throttle.enabled"; - public static final boolean DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED = true; - public static final String BULK_REPLICATION_THROTTLE_ENABLED_DOC = "If true, Cruise Control sets replication throttles once " - + "before starting inter-broker replica movements and clears them once after inter-broker replica movements end. " - + "If false, throttles are set/cleared per batch."; private ExecutorConfig() { } @@ -999,11 +991,6 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_AUTO_STOP_EXTERNAL_AGENT, ConfigDef.Importance.MEDIUM, - AUTO_STOP_EXTERNAL_AGENT_DOC) - .define(BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, - ConfigDef.Type.BOOLEAN, - DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED, - ConfigDef.Importance.MEDIUM, - BULK_REPLICATION_THROTTLE_ENABLED_DOC); + AUTO_STOP_EXTERNAL_AGENT_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 7c8cae93bf..f9d9ebb5d2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1607,14 +1607,13 @@ private void updateOngoingExecutionState() { private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { Set currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS); ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, currentDeadBrokersWithReplicas); - final boolean bulkThrottleEnabled = _config.getBoolean(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG); int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); try { - // If bulk throttle is enabled, set throttles once for all pending inter-broker tasks before entering the loop. - if (bulkThrottleEnabled && _replicationThrottle != null && numTotalPartitionMovements > 0) { + // Set throttles once for all pending inter-broker tasks before entering the loop. + if (_replicationThrottle != null && numTotalPartitionMovements > 0) { ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary( Collections.singleton(INTER_BROKER_REPLICA_ACTION)); Map> interBrokerTasksByState = summaryAtStart.filteredTasksByState() @@ -1642,15 +1641,11 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { - if (!bulkThrottleEnabled) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); - } // Execute the tasks. _executionTaskManager.markTasksInProgress(tasksToExecute); result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); } // Wait indefinitely for partition movements to finish. - List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); @@ -1662,17 +1657,10 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc finishedDataMovementInMB, totalDataToMoveInMB, totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE / totalDataToMoveInMB)); - List inProgressTasks = tasksToExecute.stream() - .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) - .collect(Collectors.toList()); - inProgressTasks.addAll(inExecutionTasks()); - if (!bulkThrottleEnabled) { - throttleHelper.clearThrottles(completedTasks, inProgressTasks); - } } } finally { - // Ensure bulk throttles are cleared even if an exception occurs during inter-broker movements. - if (bulkThrottleEnabled && _replicationThrottle != null) { + // Ensure throttles are cleared even if an exception occurs during inter-broker movements. + if (_replicationThrottle != null) { try { ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary( Collections.singleton(INTER_BROKER_REPLICA_ACTION)); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 27984da4fd..29cbb4ac0d 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -35,7 +35,6 @@ import java.util.Random; import java.util.Set; import java.util.HashSet; -import java.util.stream.Collectors; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -43,8 +42,6 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ElectLeadersResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; @@ -55,7 +52,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -426,143 +422,6 @@ public void testSetExecutionProgressCheckIntervalMsWithNoRequestedValue() { assertEquals(validValue, executor.executionProgressCheckIntervalMs()); } - @Test - public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleApplied() - throws InterruptedException, OngoingExecutionException { - Collection proposalsToExecute = createProposalsForTwoTopics(PRODUCE_SIZE_IN_BYTES); - - // Start execution with replication throttle using helper - final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; - Executor executor = startExecutionWithBulkThrottle(proposalsToExecute, replicationThrottle); - - // Wait for inter-broker movement to start - waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), - "Inter-broker replica movement task did not start within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_SHORT_CHECK_MS); - - try (AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient( - Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker(0).plaintextAddr()))) { - - waitUntilTrue(() -> { - try { - for (int brokerId : List.of(0, 1)) { - ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); - Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); - Config brokerCfg = cfg.get(cf); - String leaderActual = brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE) == null - ? null : brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE).value(); - String followerActual = brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE) == null - ? null : brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE).value(); - if (!String.valueOf(replicationThrottle).equals(leaderActual) - || !String.valueOf(replicationThrottle).equals(followerActual)) { - return false; - } - } - return true; - } catch (InterruptedException | ExecutionException e) { - return false; - } - }, - "Throttle rate not applied to brokers in time", - EXECUTION_DEADLINE_MS, - EXECUTION_SHORT_CHECK_MS); - - Set topicsSnapshot = proposalsToExecute.stream().map(ExecutionProposal::topic).collect(Collectors.toSet()); - for (String topic : topicsSnapshot) { - waitUntilTrue(() -> { - try { - ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); - Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); - Config topicCfg = cfg.get(cf); - String leaderReplicas = topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS) == null - ? null : topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS).value(); - String followerReplicas = topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS) == null - ? null : topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS).value(); - return leaderReplicas != null && !leaderReplicas.isEmpty() && leaderReplicas.contains("0:") - && followerReplicas != null && !followerReplicas.isEmpty() && followerReplicas.contains("0:"); - } catch (InterruptedException | ExecutionException e) { - return false; - } - }, - "Topic throttled replicas not applied for topic " + topic, - EXECUTION_DEADLINE_MS, - EXECUTION_SHORT_CHECK_MS); - } - - // Explicitly stop to avoid long wait, then ensure background execution is drained - executor.userTriggeredStopExecution(false); - waitUntilTrue(() -> (!executor.hasOngoingExecution() && executor.state().state() == ExecutorState.State.NO_TASK_IN_PROGRESS), - "Stopped proposal execution did not finish within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_REGULAR_CHECK_MS); - } - } - - @Test - public void testInterBrokerMoveReplicasWhenBulkThrottleEnabledThenThrottleClearedAtEnd() - throws InterruptedException, OngoingExecutionException { - Collection proposalsToExecute = createProposalsForTwoTopics(PRODUCE_SIZE_IN_BYTES); - - // Start execution with replication throttle using helper - final long replicationThrottle = PRODUCE_SIZE_IN_BYTES; - Executor executor = startExecutionWithBulkThrottle(proposalsToExecute, replicationThrottle); - - // Ensure execution started, confirm reassignments began, then stop to expedite teardown and wait for completion - waitUntilTrue(() -> (executor.state().state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS), - "Inter-broker replica movement task did not start within the time limit", EXECUTION_DEADLINE_MS, EXECUTION_SHORT_CHECK_MS); - verifyOngoingPartitionReassignments(Collections.singleton(TP0)); - executor.userTriggeredStopExecution(false); - waitUntilTrue(() -> (!executor.hasOngoingExecution() && executor.state().state() == ExecutorState.State.NO_TASK_IN_PROGRESS), - "Stopped proposal execution did not finish within the time limit", EXECUTION_DEADLINE_MS * 3, EXECUTION_REGULAR_CHECK_MS); - - // Verify throttles are cleared at the end - try (AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient( - Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker(0).plaintextAddr()))) { - - waitUntilTrue(() -> { - try { - for (int brokerId : List.of(0, 1)) { - ConfigResource cf = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); - Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); - Config brokerCfg = cfg.get(cf); - String leaderActual = brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE) == null - ? null : brokerCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_RATE).value(); - String followerActual = brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE) == null - ? null : brokerCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE).value(); - if (leaderActual != null || followerActual != null) { - return false; - } - } - return true; - } catch (InterruptedException | ExecutionException e) { - return false; - } - }, - "Throttle not cleared from brokers in time", - EXECUTION_DEADLINE_MS, - EXECUTION_SHORT_CHECK_MS); - - // Topic replicas cleared - Set topicsSnapshot = proposalsToExecute.stream().map(ExecutionProposal::topic).collect(Collectors.toSet()); - for (String topic : topicsSnapshot) { - waitUntilTrue(() -> { - try { - ConfigResource cf = new ConfigResource(ConfigResource.Type.TOPIC, topic); - Map cfg = adminClient.describeConfigs(Collections.singletonList(cf)).all().get(); - Config topicCfg = cfg.get(cf); - ConfigEntry leaderEntry = topicCfg.get(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS); - ConfigEntry followerEntry = topicCfg.get(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS); - boolean leaderCleared = leaderEntry == null || leaderEntry.value() == null || leaderEntry.value().isEmpty(); - boolean followerCleared = followerEntry == null || followerEntry.value() == null || followerEntry.value().isEmpty(); - return leaderCleared && followerCleared; - } catch (InterruptedException | ExecutionException e) { - return false; - } - }, - "Topic throttled replicas not cleared for topic " + topic, - EXECUTION_DEADLINE_MS, - EXECUTION_SHORT_CHECK_MS); - } - } - } - @Test public void testResetExecutionProgressCheckIntervalMs() { KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties()); @@ -755,43 +614,6 @@ private void populateProposals(List proposalToExecute, proposalToVerify.addAll(Arrays.asList(proposal0, proposal1)); } - private Collection createProposalsForTwoTopics(long produceSizeInBytes) throws InterruptedException { - Map topicDescriptions = createTopics((int) produceSizeInBytes); - int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id(); - int initialLeader1 = topicDescriptions.get(TOPIC1).partitions().get(0).leader().id(); - ExecutionProposal proposal0 = - new ExecutionProposal(TP0, produceSizeInBytes, new ReplicaPlacementInfo(initialLeader0), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0)), - Collections.singletonList(new ReplicaPlacementInfo(initialLeader0 == 0 ? 1 : 0))); - ExecutionProposal proposal1 = - new ExecutionProposal(TP1, 0, new ReplicaPlacementInfo(initialLeader1), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1), new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0)), - Arrays.asList(new ReplicaPlacementInfo(initialLeader1 == 0 ? 1 : 0), new ReplicaPlacementInfo(initialLeader1))); - return List.of(proposal0, proposal1); - } - - private Executor startExecutionWithBulkThrottle(Collection proposalsToExecute, - long replicationThrottle) throws OngoingExecutionException { - Properties props = getExecutorProperties(); - props.setProperty(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG, "true"); - KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(props); - - UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo(); - UserTaskManager mockUserTaskManager = getMockUserTaskManager(RANDOM_UUID, mockUserTaskInfo, Collections.singletonList(true)); - LoadMonitor mockLoadMonitor = getMockLoadMonitor(); - AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false); - EasyMock.replay(mockUserTaskInfo, mockUserTaskManager, mockLoadMonitor, mockAnomalyDetectorManager); - - MetricRegistry metricRegistry = new MetricRegistry(); - Executor executor = new Executor(configs, Time.SYSTEM, metricRegistry, mockAnomalyDetectorManager); - executor.setUserTaskManager(mockUserTaskManager); - - executor.setGeneratingProposalsForExecution(RANDOM_UUID, ExecutorTest.class::getSimpleName, true); - executor.executeProposals(proposalsToExecute, Collections.emptySet(), null, mockLoadMonitor, null, null, null, null, null, - null, null, replicationThrottle, true, RANDOM_UUID, false, false); - return executor; - } - /** * Creates {@link com.linkedin.kafka.cruisecontrol.common.TestConstants#TOPIC0 topic0} with replication factor 1 and * {@link com.linkedin.kafka.cruisecontrol.common.TestConstants#TOPIC1 topic1} with replication factor 2. Waits until diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index 846cc3647e..51192f20f3 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -156,7 +156,6 @@ The following configurations are inherited from the open source Kafka client con | metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | | topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | | default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | -| bulk.replication.throttle.enabled | Boolean | N | true | If true, Cruise Control sets replication throttles once before starting inter-broker replica movements and clears them once after inter-broker replica movements end. If false, throttles are set/cleared per batch. Effective only when a replication throttle is configured via request parameter `replication_throttle` or config `default.replication.throttle`. | | replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | | default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | | executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | From b3282b675bbe6b1a2591a9ac23b52470560cd337 Mon Sep 17 00:00:00 2001 From: il-kyun Date: Wed, 15 Oct 2025 20:41:45 +0900 Subject: [PATCH 8/9] remove misc --- .../kafka/cruisecontrol/config/constants/ExecutorConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 4f833e615b..99a2b4cd87 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -610,7 +610,6 @@ public final class ExecutorConfig { public static final String AUTO_STOP_EXTERNAL_AGENT_DOC = "When starting a new proposal execution while external agent is reassigning partitions," + " automatically stop the external agent and start the execution." + " Set to false to keep the external agent reassignment and skip starting the execution."; - private ExecutorConfig() { } From 887f1e11320bb8477ec32b0ad6d73dca7a4d371f Mon Sep 17 00:00:00 2001 From: il-kyun Date: Wed, 15 Oct 2025 22:00:55 +0900 Subject: [PATCH 9/9] fix conflicts --- .../cruisecontrol/executor/Executor.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index f9d9ebb5d2..7afb901ce8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1606,7 +1606,8 @@ private void updateOngoingExecutionState() { private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { Set currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, currentDeadBrokersWithReplicas); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, + currentDeadBrokersWithReplicas); int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); @@ -1627,9 +1628,14 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc .filter(p -> existingTopics.contains(p.topic())) .collect(Collectors.toList()); long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count(); - LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " + LOG.info( + "User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " + "({} pending before filtering) across {} topics.", - _uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle); + _uuid, + _replicationThrottle, + proposalsForExistingTopics.size(), + pendingAtStart.size(), + numExistingTopicsToThrottle); throttleHelper.setThrottles(proposalsForExistingTopics); } int partitionsToMove = numTotalPartitionMovements; @@ -1646,17 +1652,22 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); } // Wait indefinitely for partition movements to finish. + waitForInterBrokerReplicaTasksToFinish(result); partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", - _uuid, - numFinishedPartitionMovements, numTotalPartitionMovements, - String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), - finishedDataMovementInMB, totalDataToMoveInMB, - totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE - / totalDataToMoveInMB)); + _uuid, + numFinishedPartitionMovements, numTotalPartitionMovements, + String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), + finishedDataMovementInMB, totalDataToMoveInMB, + totalDataToMoveInMB == 0 ? 100 + : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE / totalDataToMoveInMB)); + List inProgressTasks = tasksToExecute.stream() + .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) + .collect(Collectors.toList()); + inProgressTasks.addAll(inExecutionTasks()); } } finally { // Ensure throttles are cleared even if an exception occurs during inter-broker movements.