Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class BalancingConstraint {
private final long _fastModePerBrokerMoveTimeoutMs;
private final BrokerSetResolver _brokerSetResolver;
private final ReplicaToBrokerSetMappingPolicy _replicaToBrokerSetMappingPolicy;
private final double _topicLeaderReplicaBalancePercentage;
private final int _topicLeaderReplicaBalanceMinGap;
private final int _topicLeaderReplicaBalanceMaxGap;
private final double _topicLeaderReplicaDistributionGoalBalanceMargin;

/**
* Constructor for Balancing Constraint.
Expand Down Expand Up @@ -78,6 +82,10 @@ public BalancingConstraint(KafkaCruiseControlConfig config) {
_topicReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_topicReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG);
_topicReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG);
_topicLeaderReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_topicLeaderReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG);
_topicLeaderReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG);
_topicLeaderReplicaDistributionGoalBalanceMargin = config.getDouble(AnalyzerConfig.TOPIC_LEADER_REPLICA_DISTRIBUTION_GOAL_BALANCE_MARGIN_CONFIG);
_goalViolationDistributionThresholdMultiplier = config.getDouble(AnalyzerConfig.GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG);
// Set default value for the topics that must have a minimum number of leader replicas on brokers that are not
// excluded for replica move.
Expand All @@ -88,13 +96,13 @@ public BalancingConstraint(KafkaCruiseControlConfig config) {
// BrokerSet Data resolver class
Map<String, Object> parameterConfigOverridesForBrokerSetResolver = new HashMap<>();
parameterConfigOverridesForBrokerSetResolver.put(BrokerSetFileResolver.BROKER_SET_ASSIGNMENT_POLICY_OBJECT_CONFIG,
config.getConfiguredInstance(AnalyzerConfig.BROKER_SET_ASSIGNMENT_POLICY_CLASS_CONFIG,
BrokerSetAssignmentPolicy.class));
config.getConfiguredInstance(AnalyzerConfig.BROKER_SET_ASSIGNMENT_POLICY_CLASS_CONFIG,
BrokerSetAssignmentPolicy.class));
_brokerSetResolver = config.getConfiguredInstance(AnalyzerConfig.BROKER_SET_RESOLVER_CLASS_CONFIG, BrokerSetResolver.class,
parameterConfigOverridesForBrokerSetResolver);
parameterConfigOverridesForBrokerSetResolver);
// Replica to Broker Set mapping policy class
_replicaToBrokerSetMappingPolicy = config.getConfiguredInstance(AnalyzerConfig.REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG,
ReplicaToBrokerSetMappingPolicy.class);
ReplicaToBrokerSetMappingPolicy.class);
}

Properties setProps(Properties props) {
Expand Down Expand Up @@ -131,6 +139,31 @@ Properties setProps(Properties props) {
return props;
}

/**
* @return Topic replica balance percentage for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}.
*/
public double topicLeaderReplicaBalancePercentage() {
return _topicLeaderReplicaBalancePercentage;
}

/**
* @return Topic replica balance minimum gap for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}.
*/
public int topicLeaderReplicaBalanceMinGap() {
return _topicLeaderReplicaBalanceMinGap;
}

/**
* @return Hysteresis margin multiplier for TopicLeaderReplicaDistributionGoal percentage band.
*/
public double topicLeaderReplicaDistributionGoalBalanceMargin() {
return _topicLeaderReplicaDistributionGoalBalanceMargin;
}

public int topicLeaderReplicaBalanceMaxGap() {
return _topicLeaderReplicaBalanceMaxGap;
}

/**
* @return The maximum number of replicas per broker.
*/
Expand Down Expand Up @@ -328,23 +361,26 @@ void setCapacityThreshold(double capacityThreshold) {
@Override
public String toString() {
return String.format("BalancingConstraint[cpuBalancePercentage=%.4f,diskBalancePercentage=%.4f,"
+ "inboundNwBalancePercentage=%.4f,outboundNwBalancePercentage=%.4f,cpuCapacityThreshold=%.4f,"
+ "diskCapacityThreshold=%.4f,inboundNwCapacityThreshold=%.4f,outboundNwCapacityThreshold=%.4f,"
+ "maxReplicasPerBroker=%d,replicaBalancePercentage=%.4f,leaderReplicaBalancePercentage=%.4f,"
+ "topicReplicaBalancePercentage=%.4f,topicReplicaBalanceGap=[%d,%d],"
+ "goalViolationDistributionThresholdMultiplier=%.4f,"
+ "topicsWithMinLeadersPerBrokerPattern=%s,"
+ "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d,"
+ "brokerSetDataStore=%s,"
+ "replicaToBrokerSetMappingPolicy=%s]",
_resourceBalancePercentage.get(Resource.CPU), _resourceBalancePercentage.get(Resource.DISK),
_resourceBalancePercentage.get(Resource.NW_IN), _resourceBalancePercentage.get(Resource.NW_OUT),
_capacityThreshold.get(Resource.CPU), _capacityThreshold.get(Resource.DISK),
_capacityThreshold.get(Resource.NW_IN), _capacityThreshold.get(Resource.NW_OUT),
_maxReplicasPerBroker, _replicaBalancePercentage, _leaderReplicaBalancePercentage,
_topicReplicaBalancePercentage, _topicReplicaBalanceMinGap, _topicReplicaBalanceMaxGap,
_goalViolationDistributionThresholdMultiplier, _topicsWithMinLeadersPerBrokerPattern.pattern(),
_minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs, _brokerSetResolver.getClass().getName(),
_replicaToBrokerSetMappingPolicy.getClass().getName());
+ "inboundNwBalancePercentage=%.4f,outboundNwBalancePercentage=%.4f,cpuCapacityThreshold=%.4f,"
+ "diskCapacityThreshold=%.4f,inboundNwCapacityThreshold=%.4f,outboundNwCapacityThreshold=%.4f,"
+ "maxReplicasPerBroker=%d,replicaBalancePercentage=%.4f,leaderReplicaBalancePercentage=%.4f,"
+ "topicReplicaBalancePercentage=%.4f,topicReplicaBalanceGap=[%d,%d],"
+ "topicLeaderReplicaBalancePercentage=%.4f,topicLeaderReplicaBalanceGap=[%d,%d],_topicLeaderReplicaDistributionGoalBalanceMargin=%.4f,"
+ "goalViolationDistributionThresholdMultiplier=%.4f,"
+ "topicsWithMinLeadersPerBrokerPattern=%s,"
+ "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d,"
+ "brokerSetDataStore=%s,"
+ "replicaToBrokerSetMappingPolicy=%s]",
_resourceBalancePercentage.get(Resource.CPU), _resourceBalancePercentage.get(Resource.DISK),
_resourceBalancePercentage.get(Resource.NW_IN), _resourceBalancePercentage.get(Resource.NW_OUT),
_capacityThreshold.get(Resource.CPU), _capacityThreshold.get(Resource.DISK),
_capacityThreshold.get(Resource.NW_IN), _capacityThreshold.get(Resource.NW_OUT),
_maxReplicasPerBroker, _replicaBalancePercentage, _leaderReplicaBalancePercentage,
_topicReplicaBalancePercentage, _topicReplicaBalanceMinGap, _topicReplicaBalanceMaxGap,
_topicLeaderReplicaBalancePercentage, _topicLeaderReplicaBalanceMinGap, _topicLeaderReplicaBalanceMaxGap,
_topicLeaderReplicaDistributionGoalBalanceMargin,
_goalViolationDistributionThresholdMultiplier, _topicsWithMinLeadersPerBrokerPattern.pattern(),
_minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs, _brokerSetResolver.getClass().getName(),
_replicaToBrokerSetMappingPolicy.getClass().getName());
}
}
Loading
Loading