Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,15 @@ public final class ExecutorConfig {
public static final String AUTO_STOP_EXTERNAL_AGENT_DOC = "When starting a new proposal execution while external agent is reassigning partitions,"
+ " automatically stop the external agent and start the execution."
+ " Set to false to keep the external agent reassignment and skip starting the execution.";

/**
* <code>bulk.replication.throttle.enabled</code>
*/
public static final String BULK_REPLICATION_THROTTLE_ENABLED_CONFIG = "bulk.replication.throttle.enabled";
public static final boolean DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED = true;
public static final String BULK_REPLICATION_THROTTLE_ENABLED_DOC = "If true, Cruise Control sets replication throttles once "
+ "before starting inter-broker replica movements and clears them once after inter-broker replica movements end. "
+ "If false, throttles are set/cleared per batch.";
private ExecutorConfig() {
}

Expand Down Expand Up @@ -990,6 +999,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_AUTO_STOP_EXTERNAL_AGENT,
ConfigDef.Importance.MEDIUM,
AUTO_STOP_EXTERNAL_AGENT_DOC);
AUTO_STOP_EXTERNAL_AGENT_DOC)
.define(BULK_REPLICATION_THROTTLE_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_BULK_REPLICATION_THROTTLE_ENABLED,
ConfigDef.Importance.MEDIUM,
BULK_REPLICATION_THROTTLE_ENABLED_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1606,53 +1606,97 @@ private void updateOngoingExecutionState() {

private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
Set<Integer> currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle,
currentDeadBrokersWithReplicas);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle, currentDeadBrokersWithReplicas);
final boolean bulkThrottleEnabled = _config.getBoolean(ExecutorConfig.BULK_REPLICATION_THROTTLE_ENABLED_CONFIG);
int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks();
LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
try {
// If bulk throttle is enabled, set throttles once for all pending inter-broker tasks before entering the loop.
if (bulkThrottleEnabled && _replicationThrottle != null && numTotalPartitionMovements > 0) {
ExecutionTasksSummary summaryAtStart = _executionTaskManager.getExecutionTasksSummary(
Collections.singleton(INTER_BROKER_REPLICA_ACTION));
Map<ExecutionTaskState, Set<ExecutionTask>> interBrokerTasksByState = summaryAtStart.filteredTasksByState()
.get(INTER_BROKER_REPLICA_ACTION);
Set<ExecutionTask> pendingAtStart = interBrokerTasksByState.getOrDefault(ExecutionTaskState.PENDING,
Collections.emptySet());
// Filter out proposals for non-existent topics to avoid admin timeouts on config changes for non-existent topics.
Set<String> existingTopics = _metadataClient.refreshMetadata().cluster().topics();
List<ExecutionProposal> proposalsForExistingTopics = pendingAtStart.stream()
.map(ExecutionTask::proposal)
.filter(p -> existingTopics.contains(p.topic()))
.collect(Collectors.toList());
long numExistingTopicsToThrottle = proposalsForExistingTopics.stream().map(ExecutionProposal::topic).distinct().count();
LOG.info("User task {}: Applying bulk replication throttle of {} B/s to {} inter-broker movements "
+ "({} pending before filtering) across {} topics.",
_uuid, _replicationThrottle, proposalsForExistingTopics.size(), pendingAtStart.size(), numExistingTopicsToThrottle);
throttleHelper.setThrottles(proposalsForExistingTopics);
}
// Wait indefinitely for partition movements to finish.
List<ExecutionTask> completedTasks = waitForInterBrokerReplicaTasksToFinish(result);
partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements();
long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB();
updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime);
LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.",
_uuid,
numFinishedPartitionMovements, numTotalPartitionMovements,
String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements),
finishedDataMovementInMB, totalDataToMoveInMB,
totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE
/ totalDataToMoveInMB));
List<ExecutionTask> inProgressTasks = tasksToExecute.stream()
.filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS)
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

throttleHelper.clearThrottles(completedTasks, inProgressTasks);
int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks();
LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
if (!bulkThrottleEnabled) {
throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
}
// Wait indefinitely for partition movements to finish.
List<ExecutionTask> completedTasks = waitForInterBrokerReplicaTasksToFinish(result);
partitionsToMove = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements();
long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB();
updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime);
LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.",
_uuid,
numFinishedPartitionMovements, numTotalPartitionMovements,
String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements),
finishedDataMovementInMB, totalDataToMoveInMB,
totalDataToMoveInMB == 0 ? 100 : String.format("%.2f", finishedDataMovementInMB * UNIT_INTERVAL_TO_PERCENTAGE
/ totalDataToMoveInMB));
List<ExecutionTask> inProgressTasks = tasksToExecute.stream()
.filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS)
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());
if (!bulkThrottleEnabled) {
throttleHelper.clearThrottles(completedTasks, inProgressTasks);
}
}
} finally {
// Ensure bulk throttles are cleared even if an exception occurs during inter-broker movements.
if (bulkThrottleEnabled && _replicationThrottle != null) {
try {
ExecutionTasksSummary summaryAtEnd = _executionTaskManager.getExecutionTasksSummary(
Collections.singleton(INTER_BROKER_REPLICA_ACTION));
Map<ExecutionTaskState, Set<ExecutionTask>> interBrokerTasksByState = summaryAtEnd.filteredTasksByState()
.get(INTER_BROKER_REPLICA_ACTION);
Map<ExecutionTaskState, Set<ExecutionTask>> tasksByState =
interBrokerTasksByState == null ? Collections.emptyMap() : interBrokerTasksByState;
logBulkThrottleCleanup(tasksByState);
List<ExecutionTask> completedTasks = buildCompletedTasks(tasksByState);
List<ExecutionTask> inProgressTasks = buildInProgressTasks(tasksByState);
throttleHelper.clearThrottles(completedTasks, inProgressTasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e);
} catch (ExecutionException | TimeoutException e) {
LOG.warn("User task {}: Failed to clear bulk replication throttles during cleanup.", _uuid, e);
}
}
// Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not
// in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to
// have it been safely used in following leadership move tasks.
resetExecutionProgressCheckIntervalMs();
}

// Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not
// in leadership move task. Thus reset it to initial value once interBrokerMoveReplicas has stopped to
// have it been safely used in following leadership move tasks.
resetExecutionProgressCheckIntervalMs();

// At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead.
if (_stopSignal.get() == NO_STOP_EXECUTION) {
LOG.info("User task {}: Inter-broker partition movements finished", _uuid);
Expand Down Expand Up @@ -1784,6 +1828,32 @@ private int moveLeadershipInBatch() {
return numLeadershipToMove;
}

private void logBulkThrottleCleanup(Map<ExecutionTaskState, Set<ExecutionTask>> tasksByState) {
LOG.info("User task {}: Clearing bulk replication throttles (configured rate: {} B/s). "
+ "Completed: {}, Aborted: {}, Dead: {}, InProgress: {}, Aborting: {}.",
_uuid, _replicationThrottle,
tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()).size(),
tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()).size(),
tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()).size(),
tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()).size(),
tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()).size());
}

private List<ExecutionTask> buildCompletedTasks(Map<ExecutionTaskState, Set<ExecutionTask>> tasksByState) {
List<ExecutionTask> completedTasks = new ArrayList<>();
completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.COMPLETED, Collections.emptySet()));
completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTED, Collections.emptySet()));
completedTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.DEAD, Collections.emptySet()));
return completedTasks;
}

private List<ExecutionTask> buildInProgressTasks(Map<ExecutionTaskState, Set<ExecutionTask>> tasksByState) {
List<ExecutionTask> inProgressTasks = new ArrayList<>();
inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.IN_PROGRESS, Collections.emptySet()));
inProgressTasks.addAll(tasksByState.getOrDefault(ExecutionTaskState.ABORTING, Collections.emptySet()));
return inProgressTasks;
}

/**
* First waits for {@link #executionProgressCheckIntervalMs} for the execution to make progress, then retrieves the
* cluster state for the progress check.
Expand Down
Loading
Loading