diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 22f210f0a5..f9d9ebb5d2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1606,53 +1606,85 @@ private void updateOngoingExecutionState() { private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException { Set currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS); - ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, - currentDeadBrokersWithReplicas); + ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, currentDeadBrokersWithReplicas); int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); - - int partitionsToMove = numTotalPartitionMovements; - // Exhaust all the pending partition movements. - while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { - // Get tasks to execute. - List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); - LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); - - AlterPartitionReassignmentsResult result = null; - if (!tasksToExecute.isEmpty()) { - throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList())); - // Execute the tasks. - _executionTaskManager.markTasksInProgress(tasksToExecute); - result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); + try { + // Set throttles once for all pending inter-broker tasks before entering the loop. + if (_replicationThrottle != null && numTotalPartitionMovements > 0) { + ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtStart.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + Set pendingAtStart = interBrokerTasksByState.getOrDefault(ExecutionTaskState.PENDING, + Collections.emptySet()); + // Filter out proposals for non-existent topics to avoid admin timeouts on config changes for non-existent topics. + Set existingTopics = _metadataClient.refreshMetadata().cluster().topics(); + List proposalsForExistingTopics = pendingAtStart.stream() + .map(ExecutionTask::proposal) + .filter(p -> existingTopics.contains(p.topic())) + .collect(Collectors.toList()); + long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count(); + LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements " + + "({} pending before filtering) across {} topics.", + _uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle); + throttleHelper.setThrottles(proposalsForExistingTopics); } - // Wait indefinitely for partition movements to finish. - List completedTasks = waitForInterBrokerReplicaTasksToFinish(result); - partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); - int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); - long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); - updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); - LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", - _uuid, - numFinishedPartitionMovements, numTotalPartitionMovements, - String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), - finishedDataMovementInMB, totalDataToMoveInMB, - totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE - / totalDataToMoveInMB)); - List inProgressTasks = tasksToExecute.stream() - .filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS) - .collect(Collectors.toList()); - inProgressTasks.addAll(inExecutionTasks()); - - throttleHelper.clearThrottles(completedTasks, inProgressTasks); + int partitionsToMove = numTotalPartitionMovements; + // Exhaust all the pending partition movements. + while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { + // Get tasks to execute. + List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); + + AlterPartitionReassignmentsResult result = null; + if (!tasksToExecute.isEmpty()) { + // Execute the tasks. + _executionTaskManager.markTasksInProgress(tasksToExecute); + result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute); + } + // Wait indefinitely for partition movements to finish. + partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); + int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); + long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); + updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); + LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", + _uuid, + numFinishedPartitionMovements, numTotalPartitionMovements, + String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), + finishedDataMovementInMB, totalDataToMoveInMB, + totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE + / totalDataToMoveInMB)); + } + } finally { + // Ensure throttles are cleared even if an exception occurs during inter-broker movements. + if (_replicationThrottle != null) { + try { + ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary( + Collections.singleton(INTER_BROKER_REPLICA_ACTION)); + Map> interBrokerTasksByState = summaryAtEnd.filteredTasksByState() + .get(INTER_BROKER_REPLICA_ACTION); + Map> tasksByState = + interBrokerTasksByState == null ? Collections.emptyMap() : interBrokerTasksByState; + logBulkThrottleCleanup(tasksByState); + List completedTasks = buildCompletedTasks(tasksByState); + List inProgressTasks = buildInProgressTasks(tasksByState); + throttleHelper.clearThrottles(completedTasks, inProgressTasks); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e); + } catch (ExecutionException | TimeoutException e) { + LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e); + } + } + // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not + // in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to + // have it been safely used in following leadership move tasks. + resetExecutionProgressCheckIntervalMs(); } - // Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not - // in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to - // have it been safely used in following leadership move tasks. - resetExecutionProgressCheckIntervalMs(); - // At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead. if (_stopSignal.get() == NO_STOP_EXECUTION) { LOG.info("User task {}: Inter-broker partition movements finished", _uuid); @@ -1784,6 +1816,32 @@ private int moveLeadershipInBatch() { return numLeadershipToMove; } + private void logBulkThrottleCleanup(Map> tasksByState) { + LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). " + + "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.", + _uuid, _replicationThrottle, + tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(), + tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size()); + } + + private List buildCompletedTasks(Map> tasksByState) { + List completedTasks = new ArrayList<>(); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet())); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet())); + completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet())); + return completedTasks; + } + + private List buildInProgressTasks(Map> tasksByState) { + List inProgressTasks = new ArrayList<>(); + inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet())); + inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet())); + return inProgressTasks; + } + /** * First waits for {@link #executionProgressCheckIntervalMs} for the execution to make progress, then retrieves the * cluster state for the progress check.