Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,11 @@ private static boolean hasProposalsToExecute(Collection<ExecutionProposal> propo
* @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks
* (if null, use default.replica.movement.strategies).
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing proposals (if null, no throttling is applied).
* when executing proposals; That is, this parameter only affects inter-broker replica
* movements (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing proposals; That is, this parameter only affects intra-broker replica
* movements (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker
Expand All @@ -667,14 +671,15 @@ public void executeProposals(Set<ExecutionProposal> proposals,
Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add logDirThrottle to the executeRemoval() method as well? This way we could clear the throttles from the broker metadata when brokers are removed from the cluster.

boolean isTriggeredByUserRequest,
String uuid,
boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor, concurrentInterBrokerPartitionMovements,
maxInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, clusterConcurrentLeaderMovements,
brokerConcurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
logDirThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
} else {
failGeneratingProposalsForExecution(uuid);
}
Expand Down Expand Up @@ -720,7 +725,7 @@ public void executeRemoval(Set<ExecutionProposal> proposals,
_executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers,
_loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, 0,
clusterLeaderMovementConcurrency, brokerLeaderMovementConcurrency,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, null,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false);
} else {
failGeneratingProposalsForExecution(uuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ public final class ExecutorConfig {
public static final String DEFAULT_REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being "
+ "moved, in bytes per second.";

/**
* <code>default.log.dir.throttle</code>
*/
public static final String DEFAULT_LOG_DIR_THROTTLE_CONFIG = "default.log.dir.throttle";
public static final Long DEFAULT_DEFAULT_LOG_DIR_THROTTLE = null;
public static final String DEFAULT_LOG_DIR_THROTTLE_DOC = "The throttle applied to replicas being moved between "
+ "the log dirs, in bytes per second.";

/**
* <code>replica.movement.strategies</code>
*/
Expand Down Expand Up @@ -741,6 +749,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_DEFAULT_REPLICATION_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_REPLICATION_THROTTLE_DOC)
.define(DEFAULT_LOG_DIR_THROTTLE_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_DEFAULT_LOG_DIR_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_LOG_DIR_THROTTLE_DOC)
.define(REPLICA_MOVEMENT_STRATEGIES_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_REPLICA_MOVEMENT_STRATEGIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use new HashSet<>() from java.util here instead?

import com.google.common.util.concurrent.AtomicDouble;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
Expand Down Expand Up @@ -801,6 +802,8 @@ public boolean setConcurrencyAdjusterMinIsrCheck(boolean isMinIsrBasedConcurrenc
* @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing a proposal (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing a proposal (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise.
Expand All @@ -819,6 +822,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
Long requestedExecutionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean isKafkaAssignerMode,
Expand All @@ -831,7 +835,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
LOG.info("User task {}: Broker removal operation aborted due to ongoing execution", uuid);
Expand Down Expand Up @@ -924,7 +928,7 @@ public synchronized void executeDemoteProposals(Collection<ExecutionProposal> pr
initProposalExecution(proposals, demotedBrokers, concurrentSwaps, null, 0,
requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency,
requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, null, isTriggeredByUserRequest);
} catch (Exception e) {
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -1005,12 +1009,15 @@ private int numExecutionStartedInNonKafkaAssignerMode() {
* @param removedBrokers Brokers to be removed, null if no broker has been removed.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* while moving partitions (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* while moving partitions (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
*/
private void startExecution(LoadMonitor loadMonitor,
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) throws OngoingExecutionException {
_executionStoppedByUser.set(false);
sanityCheckOngoingMovement();
Expand Down Expand Up @@ -1046,7 +1053,7 @@ private void startExecution(LoadMonitor loadMonitor,
_numExecutionStartedInNonKafkaAssignerMode.incrementAndGet();
}
_proposalExecutor.execute(
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, isTriggeredByUserRequest));
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest));
}

/**
Expand Down Expand Up @@ -1300,6 +1307,7 @@ private class ProposalExecutionRunnable implements Runnable {
private final Set<Integer> _recentlyDemotedBrokers;
private final Set<Integer> _recentlyRemovedBrokers;
private final Long _replicationThrottle;
private final Long _logDirThrottle;
private Throwable _executionException;
private final boolean _isTriggeredByUserRequest;
private long _lastSlowTaskReportingTimeMs;
Expand All @@ -1314,6 +1322,7 @@ private class ProposalExecutionRunnable implements Runnable {
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) {
_loadMonitor = loadMonitor;
_demotedBrokers = demotedBrokers;
Expand Down Expand Up @@ -1349,6 +1358,7 @@ private class ProposalExecutionRunnable implements Runnable {
_recentlyDemotedBrokers = recentlyDemotedBrokers();
_recentlyRemovedBrokers = recentlyRemovedBrokers();
_replicationThrottle = replicationThrottle;
_logDirThrottle = logDirThrottle;
_isTriggeredByUserRequest = isTriggeredByUserRequest;
_lastSlowTaskReportingTimeMs = -1L;
if (_removedBrokers != null && !_removedBrokers.isEmpty()) {
Expand Down Expand Up @@ -1606,8 +1616,7 @@ private void updateOngoingExecutionState() {

private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
Set<Integer> currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle,
currentDeadBrokersWithReplicas);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, currentDeadBrokersWithReplicas);
int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
Expand All @@ -1622,7 +1631,10 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
if (_replicationThrottle != null) {
throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_replicationThrottle);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
Expand All @@ -1645,7 +1657,9 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

throttleHelper.clearThrottles(completedTasks, inProgressTasks);
if (_replicationThrottle != null) {
throttleHelper.clearInterBrokerThrottles(completedTasks, inProgressTasks);
}
}

// Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not
Expand Down Expand Up @@ -1676,20 +1690,28 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
}
}

private void intraBrokerMoveReplicas() {
private void intraBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient);
int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
Set<Integer> participatingBrokers = Sets.newHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use new HashSet<>() from java.util instead?

It appears that Sets.newHashSet() is deprecated and uses java.util.HashSet under the hood anyway.

// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks();
LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());

if (!tasksToExecute.isEmpty()) {
if (_logDirThrottle != null) {
participatingBrokers = throttleHelper.setLogDirThrottles(
tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_logDirThrottle
);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
executeIntraBrokerReplicaMovements(tasksToExecute, _adminClient, _executionTaskManager, _config);
Expand All @@ -1715,6 +1737,11 @@ private void intraBrokerMoveReplicas() {
waitForIntraBrokerReplicaTasksToFinish();
inExecutionTasks = inExecutionTasks();
}

if (_logDirThrottle != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, why the underscore in the variable names here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In consistency with _replicationThrottle.

throttleHelper.clearIntraBrokerThrottles(participatingBrokers);
}

if (inExecutionTasks().isEmpty()) {
LOG.info("User task {}: Intra-broker partition movements finished.", _uuid);
} else if (_stopSignal.get() != NO_STOP_EXECUTION) {
Expand Down
Loading
Loading