Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "mode", "brokers", "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies", "moveReplicasOffVolumes" })
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "logDirThrottle", "replicaMovementStrategies", "moveReplicasOffVolumes" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaRebalanceSpec extends Spec {
Expand All @@ -45,6 +45,7 @@ public class KafkaRebalanceSpec extends Spec {
private int concurrentIntraBrokerPartitionMovements;
private int concurrentLeaderMovements;
private long replicationThrottle;
private long logDirThrottle;
private List<String> replicaMovementStrategies;
private List<BrokerAndVolumeIds> moveReplicasOffVolumes;

Expand Down Expand Up @@ -150,7 +151,7 @@ public void setConcurrentLeaderMovements(int movements) {
this.concurrentLeaderMovements = movements;
}

@Description("The upper bound, in bytes per second, on the bandwidth used to move replicas. There is no limit by default.")
@Description("The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers (i.e., inter-broker replica movements). There is no limit by default.")
@Minimum(0)
public long getReplicationThrottle() {
return replicationThrottle;
Expand All @@ -160,6 +161,16 @@ public void setReplicationThrottle(long bandwidth) {
this.replicationThrottle = bandwidth;
}

@Description("The upper bound, in bytes per second, on the bandwidth used to move replicas between disks (i.e., intra-broker replica movements). There is no limit by default.")
@Minimum(0)
public long getLogDirThrottle() {
return logDirThrottle;
}

public void setLogDirThrottle(long bandwidth) {
this.logDirThrottle = bandwidth;
}

@Description("A list of strategy class names used to determine the execution order for the replica movements in the generated optimization proposal. " +
"By default BaseReplicaMovementStrategy is used, which will execute the replica movements in the order that they were generated.")
public List<String> getReplicaMovementStrategies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ spec:
concurrentPartitionMovementsPerBroker: 0
concurrentIntraBrokerPartitionMovements: 0
concurrentLeaderMovements: 0
replicationThrottle: 0
replicationThrottle: 0
logDirThrottle: 0
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ private KafkaRebalanceStatus updateStatus(KafkaRebalance kafkaRebalance,
if (kafkaRebalanceSpec.getReplicationThrottle() > 0) {
rebalanceOptionsBuilder.withReplicationThrottle(kafkaRebalanceSpec.getReplicationThrottle());
}
if (kafkaRebalanceSpec.getLogDirThrottle() > 0) {
rebalanceOptionsBuilder.withLogDirThrottle(kafkaRebalanceSpec.getLogDirThrottle());
}
if (kafkaRebalanceSpec.getReplicaMovementStrategies() != null) {
rebalanceOptionsBuilder.withReplicaMovementStrategies(kafkaRebalanceSpec.getReplicaMovementStrategies());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public abstract class AbstractRebalanceOptions {
private final int concurrentPartitionMovementsPerBroker;
/** The upper bound of ongoing leadership movements */
private final int concurrentLeaderMovements;
/** The upper bound, in bytes per second, on the bandwidth used to move replicas */
/** The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers (i.e., inter-broker replica movements) */
private final long replicationThrottle;
/** The upper bound, in bytes per second, on the bandwidth used to move replicas between disks (i.e., intra-broker replica movements) */
private final long logDirThrottle;
/** A list of strategy class names used to determine the execution order for the replica movements in the generated optimization proposal. */
private final List<String> replicaMovementStrategies;

Expand Down Expand Up @@ -95,6 +97,13 @@ public long getReplicationThrottle() {
return replicationThrottle;
}

/**
* @return Log Dir throttle
*/
public long getLogDirThrottle() {
return logDirThrottle;
}

/**
* @return List of configured replica movement strategies
*/
Expand All @@ -112,6 +121,7 @@ public List<String> getReplicaMovementStrategies() {
this.concurrentPartitionMovementsPerBroker = builder.concurrentPartitionMovementsPerBroker;
this.concurrentLeaderMovements = builder.concurrentLeaderMovements;
this.replicationThrottle = builder.replicationThrottle;
this.logDirThrottle = builder.logDirThrottle;
this.replicaMovementStrategies = builder.replicaMovementStrategies;
}

Expand All @@ -131,6 +141,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
private int concurrentPartitionMovementsPerBroker;
private int concurrentLeaderMovements;
private long replicationThrottle;
private long logDirThrottle;
private List<String> replicaMovementStrategies;

AbstractRebalanceOptionsBuilder() {
Expand All @@ -143,6 +154,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
concurrentPartitionMovementsPerBroker = 0;
concurrentLeaderMovements = 0;
replicationThrottle = 0;
logDirThrottle = 0;
replicaMovementStrategies = null;
}

Expand Down Expand Up @@ -246,12 +258,27 @@ public B withConcurrentLeaderMovements(int movements) {
*/
public B withReplicationThrottle(long bandwidth) {
if (bandwidth < 0) {
throw new IllegalArgumentException("The max replication bandwidth should be greater than zero");
throw new IllegalArgumentException("The max inter-broker replica movement bandwidth should be greater than zero");
}
this.replicationThrottle = bandwidth;
return self();
}

/**
* Sets log dir throttle
*
* @param bandwidth The throttle bandwidth
*
* @return Instance of this builder
*/
public B withLogDirThrottle(long bandwidth) {
if (bandwidth < 0) {
throw new IllegalArgumentException("The max intra-broker replica movement bandwidth should be greater than zero");
}
this.logDirThrottle = bandwidth;
return self();
}

/**
* Sets replica movement strategies
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private PathBuilder withAbstractRebalanceParameters(AbstractRebalanceOptions opt
addIfNotZero(builder, CruiseControlParameters.CONCURRENT_PARTITION_MOVEMENTS, options.getConcurrentPartitionMovementsPerBroker());
addIfNotZero(builder, CruiseControlParameters.CONCURRENT_LEADER_MOVEMENTS, options.getConcurrentLeaderMovements());
addIfNotZero(builder, CruiseControlParameters.REPLICATION_THROTTLE, options.getReplicationThrottle());
addIfNotZero(builder, CruiseControlParameters.LOG_DIR_THROTTLE, options.getLogDirThrottle());

if (options.getGoals() != null) {
builder.withParameter(CruiseControlParameters.GOALS, options.getGoals());
Expand Down
5 changes: 4 additions & 1 deletion documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4308,7 +4308,10 @@ If not specified, the `full` mode is used by default.
|The upper bound of ongoing partition leadership movements. Default is 1000.
|replicationThrottle
|integer
|The upper bound, in bytes per second, on the bandwidth used to move replicas. There is no limit by default.
|The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers (i.e., inter-broker replica movements). There is no limit by default.
|logDirThrottle
|integer
|The upper bound, in bytes per second, on the bandwidth used to move replicas between disks (i.e., intra-broker replica movements). There is no limit by default.
|replicaMovementStrategies
|string array
|A list of strategy class names used to determine the execution order for the replica movements in the generated optimization proposal. By default BaseReplicaMovementStrategy is used, which will execute the replica movements in the order that they were generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ You can set the following rebalance tuning options when configuring Cruise Contr
| `num.concurrent.leader.movements`
| `concurrentLeaderMovements`
| 1000
| 1000
| The maximum number of partition leadership changes in each partition reassignment batch

| `default.replication.throttle`
| `replicationThrottle`
| Null (no limit)
| The bandwidth (in bytes per second) to assign to partition reassignment
| The bandwidth (in bytes per second) to assign to inter-broker partition reassignment (i.e., replica movements between brokers)

| `default.log.dir.throttle`
| `logDirThrottle`
| Null (no limit)
| The bandwidth (in bytes per second) to assign to intra-broker partition reassignment (i.e., replica movements between disks)


| `default.replica.movement.strategies`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ spec:
replicationThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas. There is no limit by default."
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers. (i.e., inter-broker movements) There is no limit by default."
logDirThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between disks. (i.e., intra-broker movements) There is no limit by default."
replicaMovementStrategies:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public enum CruiseControlConfigurationParameters {
*/
REPLICATION_THROTTLE("default.replication.throttle"),

/**
* Default log dir throttle
*/
LOG_DIR_THROTTLE("default.log.dir.throttle"),

/**
* Size of the partition aggregation window
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public enum CruiseControlParameters {
*/
REPLICATION_THROTTLE("replication_throttle"),

/**
* Log Dir throttle
*/
LOG_DIR_THROTTLE("log_dir_throttle"),

/**
* Replica movement strategies
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ spec:
replicationThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas. There is no limit by default."
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers. (i.e., inter-broker movements) There is no limit by default."
logDirThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between disks. (i.e., intra-broker movements) There is no limit by default."
replicaMovementStrategies:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ spec:
replicationThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas. There is no limit by default."
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between brokers (i.e., inter-broker replica movements). There is no limit by default."
logDirThrottle:
type: integer
minimum: 0
description: "The upper bound, in bytes per second, on the bandwidth used to move replicas between disks (i.e., intra-broker replica movements). There is no limit by default."
replicaMovementStrategies:
type: array
items:
Expand Down
Loading