diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/BalancingConstraint.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/BalancingConstraint.java index f087efe9ec..5129854c14 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/BalancingConstraint.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/BalancingConstraint.java @@ -25,6 +25,9 @@ public class BalancingConstraint { private final Map _resourceBalancePercentage; private final double _replicaBalancePercentage; private final double _leaderReplicaBalancePercentage; + private final double _topicLeaderReplicaBalancePercentage; + private final int _topicLeaderReplicaBalanceMinGap; + private final int _topicLeaderReplicaBalanceMaxGap; private final double _topicReplicaBalancePercentage; private final int _topicReplicaBalanceMinGap; private final int _topicReplicaBalanceMaxGap; @@ -75,6 +78,9 @@ public BalancingConstraint(KafkaCruiseControlConfig config) { // Set default value for the balance percentage of (1) replica, (2) leader replica and (3) topic replica distribution. _replicaBalancePercentage = config.getDouble(AnalyzerConfig.REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG); _leaderReplicaBalancePercentage = config.getDouble(AnalyzerConfig.LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG); + _topicLeaderReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG); + _topicLeaderReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG); + _topicLeaderReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG); _topicReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG); _topicReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG); _topicReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG); @@ -122,6 +128,9 @@ Properties setProps(Properties props) { props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, Double.toString(_topicReplicaBalancePercentage)); props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, Integer.toString(_topicReplicaBalanceMinGap)); props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, Integer.toString(_topicReplicaBalanceMaxGap)); + props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, Double.toString(_topicLeaderReplicaBalancePercentage)); + props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, Integer.toString(_topicLeaderReplicaBalanceMinGap)); + props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, Integer.toString(_topicLeaderReplicaBalanceMaxGap)); props.put(AnalyzerConfig.GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG, Double.toString(_goalViolationDistributionThresholdMultiplier)); props.put(AnalyzerConfig.TOPICS_WITH_MIN_LEADERS_PER_BROKER_CONFIG, _topicsWithMinLeadersPerBrokerPattern.pattern()); props.put(AnalyzerConfig.MIN_TOPIC_LEADERS_PER_BROKER_CONFIG, Integer.toString(_minTopicLeadersPerBroker)); @@ -197,6 +206,27 @@ public int topicReplicaBalanceMaxGap() { return _topicReplicaBalanceMaxGap; } + /** + * @return Topic replica balance percentage for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}. + */ + public double topicLeaderReplicaBalancePercentage() { + return _topicLeaderReplicaBalancePercentage; + } + + /** + * @return Topic replica balance minimum gap for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}. + */ + public int topicLeaderReplicaBalanceMinGap() { + return _topicLeaderReplicaBalanceMinGap; + } + + /** + * @return Topic replica balance maximum gap for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}. + */ + public int topicLeaderReplicaBalanceMaxGap() { + return _topicLeaderReplicaBalanceMaxGap; + } + /** * @return Goal violation distribution threshold multiplier to be used in detection and fixing goal violations. */ @@ -332,6 +362,7 @@ public String toString() { + "diskCapacityThreshold=%.4f,inboundNwCapacityThreshold=%.4f,outboundNwCapacityThreshold=%.4f," + "maxReplicasPerBroker=%d,replicaBalancePercentage=%.4f,leaderReplicaBalancePercentage=%.4f," + "topicReplicaBalancePercentage=%.4f,topicReplicaBalanceGap=[%d,%d]," + + "topicLeaderReplicaBalancePercentage=%.4f,topicLeaderReplicaBalanceGap=[%d,%d]," + "goalViolationDistributionThresholdMultiplier=%.4f," + "topicsWithMinLeadersPerBrokerPattern=%s," + "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d," @@ -343,6 +374,7 @@ public String toString() { _capacityThreshold.get(Resource.NW_IN), _capacityThreshold.get(Resource.NW_OUT), _maxReplicasPerBroker, _replicaBalancePercentage, _leaderReplicaBalancePercentage, _topicReplicaBalancePercentage, _topicReplicaBalanceMinGap, _topicReplicaBalanceMaxGap, + _topicLeaderReplicaBalancePercentage, _topicLeaderReplicaBalanceMinGap, _topicLeaderReplicaBalanceMaxGap, _goalViolationDistributionThresholdMultiplier, _topicsWithMinLeadersPerBrokerPattern.pattern(), _minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs, _brokerSetResolver.getClass().getName(), _replicaToBrokerSetMappingPolicy.getClass().getName()); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicLeaderReplicaDistributionGoal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicLeaderReplicaDistributionGoal.java new file mode 100644 index 0000000000..def222d521 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicLeaderReplicaDistributionGoal.java @@ -0,0 +1,610 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + * + */ + +package com.linkedin.kafka.cruisecontrol.analyzer.goals; + +import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance; +import com.linkedin.kafka.cruisecontrol.analyzer.ActionType; +import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction; +import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint; +import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionRecommendation; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; +import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException; +import com.linkedin.kafka.cruisecontrol.model.Broker; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.model.Replica; +import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory; +import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper; +import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.ACCEPT; +import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.REPLICA_REJECT; +import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING; +import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.replicaSortName; +import static com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionAbstractGoal.ChangeType.ADD; +import static com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionAbstractGoal.ChangeType.REMOVE; + + +/** + * Soft goal to balance collocations of leader replicas of the same topic over alive brokers not excluded for replica moves. + * + * + * @see com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig#TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG + * @see com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig#GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG + * @see #balancePercentageWithMargin(OptimizationOptions) + * + */ +public class TopicLeaderReplicaDistributionGoal extends AbstractGoal { + private static final Logger LOG = LoggerFactory.getLogger(TopicLeaderReplicaDistributionGoal.class); + private static final double BALANCE_MARGIN = 0.9; + // Flag to indicate whether the self healing failed to relocate all offline replicas away from dead brokers or broken + // disks in its initial attempt and currently omitting the replica balance limit to relocate remaining replicas. + private boolean _fixOfflineReplicasOnly; + + private final Map> _brokerIdsAboveBalanceUpperLimitByTopic; + private final Map> _brokerIdsUnderBalanceLowerLimitByTopic; + // Must contain only the topics to be rebalanced. + private final Map _avgTopicLeaderReplicasOnAliveBroker; + // Must contain all topics to ensure that the lower priority goals work w/o an NPE. + private final Map _balanceUpperLimitByTopic; + private final Map _balanceLowerLimitByTopic; + // This is used to identify brokers not excluded for replica moves. + private Set _brokersAllowedReplicaMove; + + /** + * A soft goal to balance collocations of replicas of the same topic. + */ + public TopicLeaderReplicaDistributionGoal() { + _brokerIdsAboveBalanceUpperLimitByTopic = new HashMap<>(); + _brokerIdsUnderBalanceLowerLimitByTopic = new HashMap<>(); + _avgTopicLeaderReplicasOnAliveBroker = new HashMap<>(); + _balanceUpperLimitByTopic = new HashMap<>(); + _balanceLowerLimitByTopic = new HashMap<>(); + } + + public TopicLeaderReplicaDistributionGoal(BalancingConstraint balancingConstraint) { + this(); + _balancingConstraint = balancingConstraint; + } + + /** + * To avoid churns, we add a balance margin to the user specified rebalance threshold. e.g. when user sets the + * threshold to be {@link BalancingConstraint#topicLeaderReplicaBalancePercentage()}, we use + * ({@link BalancingConstraint#topicLeaderReplicaBalancePercentage()}-1)*{@link #BALANCE_MARGIN} instead. + * + * @param optimizationOptions Options to adjust balance percentage with margin in case goal optimization is triggered + * by goal violation detector. + * @return The rebalance threshold with a margin. + */ + private double balancePercentageWithMargin(OptimizationOptions optimizationOptions) { + double balancePercentage = _balancingConstraint.topicLeaderReplicaBalancePercentage(); + if (optimizationOptions.isTriggeredByGoalViolation()) { + balancePercentage *= _balancingConstraint.goalViolationDistributionThresholdMultiplier(); + } + return (balancePercentage - 1) * BALANCE_MARGIN; + } + + /** + * Ensure that the given balance limit falls into min/max limits determined by min/max gaps for topic replica balance. + * If the computed balance limit is out of these gap-based limits, use the relevant max/min gap-based balance limit. + * + * @param computedLimit Computed balance upper or lower limit + * @param average Average topic replicas on broker. + * @param isLowerLimit is determining lower limit. + * + * @return A balance limit that falls into [minGap, maxGap] for topic replica balance. + * @see com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig#TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_DOC + * @see com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig#TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_DOC + */ + private int gapBasedBalanceLimit(int computedLimit, double average, boolean isLowerLimit) { + int minLimit; + int maxLimit; + if (isLowerLimit) { + maxLimit = Math.max(0, (int) (Math.floor(average) - _balancingConstraint.topicLeaderReplicaBalanceMinGap())); + minLimit = Math.max(0, (int) (Math.floor(average) - _balancingConstraint.topicLeaderReplicaBalanceMaxGap())); + } else { + minLimit = (int) (Math.ceil(average) + _balancingConstraint.topicLeaderReplicaBalanceMinGap()); + maxLimit = (int) (Math.ceil(average) + _balancingConstraint.topicLeaderReplicaBalanceMaxGap()); + } + return Math.max(minLimit, Math.min(computedLimit, maxLimit)); + } + + /** + * @param topic Topic for which the upper limit is requested. + * @param optimizationOptions Options to adjust balance upper limit in case goal optimization is triggered by goal + * violation detector. + * @return The topic replica balance upper threshold in number of topic replicas. + */ + private int balanceUpperLimit(String topic, OptimizationOptions optimizationOptions) { + int computedUpperLimit = (int) Math.ceil(_avgTopicLeaderReplicasOnAliveBroker.get(topic) + * (1 + balancePercentageWithMargin(optimizationOptions))); + return gapBasedBalanceLimit(computedUpperLimit, _avgTopicLeaderReplicasOnAliveBroker.get(topic), false); + } + + /** + * @param topic Topic for which the lower limit is requested. + * @param optimizationOptions Options to adjust balance lower limit in case goal optimization is triggered by goal + * violation detector. + * @return The replica balance lower threshold in number of topic replicas. + */ + private int balanceLowerLimit(String topic, OptimizationOptions optimizationOptions) { + int computedLowerLimit = (int) Math.floor(_avgTopicLeaderReplicasOnAliveBroker.get(topic) + * Math.max(0, (1 - balancePercentageWithMargin(optimizationOptions)))); + return gapBasedBalanceLimit(computedLowerLimit, _avgTopicLeaderReplicasOnAliveBroker.get(topic), true); + } + + /** + * Check whether the given action is acceptable by this goal. An action is acceptable if the number of topic replicas at + * (1) the source broker does not go under the allowed limit -- unless the source broker is excluded for replica moves. + * (2) the destination broker does not go over the allowed limit. + * + * @param action Action to be checked for acceptance. + * @param clusterModel The state of the cluster. + * @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by this goal, + * {@link ActionAcceptance#REPLICA_REJECT} otherwise. + */ + @Override + public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) { + Broker sourceBroker = clusterModel.broker(action.sourceBrokerId()); + Broker destinationBroker = clusterModel.broker(action.destinationBrokerId()); + String sourceTopic = action.topic(); + Replica sourceReplica = sourceBroker.replica(action.topicPartition()); + + switch (action.balancingAction()) { + case INTER_BROKER_REPLICA_SWAP: + String destinationTopic = action.destinationTopic(); + Replica destinationReplica = destinationBroker.replica(action.destinationTopicPartition()); + if (sourceTopic.equals(destinationTopic) && sourceReplica.isLeader() && destinationReplica.isLeader()) { + return ACCEPT; + } + if (!sourceReplica.isLeader() && !destinationReplica.isLeader()) { + return ACCEPT; + } + if (sourceReplica.isLeader() && !destinationReplica.isLeader()) { + return isLeadershipGoalSatisfiable(sourceTopic, sourceBroker, destinationBroker) ? ACCEPT : REPLICA_REJECT; + } + if (!sourceReplica.isLeader() && destinationReplica.isLeader()) { + return isLeadershipGoalSatisfiable(destinationTopic, destinationBroker, sourceBroker) ? ACCEPT : REPLICA_REJECT; + } + // Both replicas are leaders but for different topics + return (isLeadershipGoalSatisfiable(sourceTopic, sourceBroker, destinationBroker) + && isLeadershipGoalSatisfiable(destinationTopic, destinationBroker, sourceBroker)) ? ACCEPT : REPLICA_REJECT; + case LEADERSHIP_MOVEMENT: + return isLeadershipGoalSatisfiable(sourceTopic, sourceBroker, destinationBroker) ? ACCEPT : REPLICA_REJECT; + case INTER_BROKER_REPLICA_MOVEMENT: + if (!sourceReplica.isLeader()) { + return ACCEPT; + } + return isLeadershipGoalSatisfiable(sourceTopic, sourceBroker, destinationBroker) ? ACCEPT : REPLICA_REJECT; + default: + throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided."); + } + } + + private boolean isLeadershipGoalSatisfiable(String sourceLeaderTopic, Broker sourceBroker, Broker destinationBroker) { + return isTopicLeaderCountUnderBalanceUpperLimitAfterChange(sourceLeaderTopic, destinationBroker, ADD) + && (isExcludedForReplicaMove(sourceBroker) + || isTopicLeaderCountAboveBalanceLowerLimitAfterChange(sourceLeaderTopic, sourceBroker, REMOVE)); + } + + private boolean isTopicLeaderCountUnderBalanceUpperLimitAfterChange(String topic, + Broker broker, + ReplicaDistributionGoal.ChangeType changeType) { + int numTopicLeaders = broker.numLeadersFor(topic); + int brokerBalanceUpperLimit = broker.isAlive() ? _balanceUpperLimitByTopic.get(topic) : 0; + + return changeType == ADD ? numTopicLeaders + 1 <= brokerBalanceUpperLimit : numTopicLeaders - 1 <= brokerBalanceUpperLimit; + } + + private boolean isTopicLeaderCountAboveBalanceLowerLimitAfterChange(String topic, + Broker broker, + ReplicaDistributionGoal.ChangeType changeType) { + int numTopicLeaders = broker.numLeadersFor(topic); + int brokerBalanceLowerLimit = broker.isAlive() ? _balanceLowerLimitByTopic.get(topic) : 0; + + return changeType == ADD ? numTopicLeaders + 1 >= brokerBalanceLowerLimit : numTopicLeaders - 1 >= brokerBalanceLowerLimit; + } + + /** + * Check whether the given broker is excluded for replica moves. + * Such a broker cannot receive replicas, but can give them away. + * + * @param broker Broker to check for exclusion from replica moves. + * @return {@code true} if the given broker is excluded for replica moves, {@code false} otherwise. + */ + private boolean isExcludedForReplicaMove(Broker broker) { + return !_brokersAllowedReplicaMove.contains(broker.id()); + } + + @Override + public ClusterModelStatsComparator clusterModelStatsComparator() { + return new GoalUtils.HardGoalStatsComparator(); + } + + @Override + public ModelCompletenessRequirements clusterModelCompletenessRequirements() { + return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0.0, true); + } + + @Override + public String name() { + return TopicLeaderReplicaDistributionGoal.class.getSimpleName(); + } + + @Override + public boolean isHardGoal() { + return true; + } + + /** + * Initiates this goal. + * + * @param clusterModel The state of the cluster. + * @param optimizationOptions Options to take into account during optimization. + */ + @Override + protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) + throws OptimizationFailureException { + Set excludedTopics = optimizationOptions.excludedTopics(); + Set topicsToRebalance = GoalUtils.topicsToRebalance(clusterModel, excludedTopics); + if (topicsToRebalance.isEmpty()) { + LOG.warn("All topics are excluded from {}.", name()); + } + + _brokersAllowedReplicaMove = GoalUtils.aliveBrokersNotExcludedForReplicaMove(clusterModel, optimizationOptions); + if (_brokersAllowedReplicaMove.isEmpty()) { + // Handle the case when all alive brokers are excluded from replica moves. + ProvisionRecommendation recommendation = new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED) + .numBrokers(clusterModel.maxReplicationFactor()).build(); + throw new OptimizationFailureException(String.format("[%s] All alive brokers are excluded from replica moves.", name()), recommendation); + } + // Initialize the average replicas on an alive broker. + final Map numTopicLeadersMap = clusterModel.numLeadersPerTopic(clusterModel.topics()); + for (String topic : clusterModel.topics()) { + int numTopicLeaders = numTopicLeadersMap.get(topic); + _avgTopicLeaderReplicasOnAliveBroker.put(topic, (numTopicLeaders / (double) _brokersAllowedReplicaMove.size())); + _balanceUpperLimitByTopic.put(topic, balanceUpperLimit(topic, optimizationOptions)); + _balanceLowerLimitByTopic.put(topic, balanceLowerLimit(topic, optimizationOptions)); + // Retain only the topics to rebalance in _avgTopicReplicasOnAliveBroker + if (!topicsToRebalance.contains(topic)) { + _avgTopicLeaderReplicasOnAliveBroker.remove(topic); + } + } + // Filter out replicas to be considered for replica movement. + for (Broker broker : clusterModel.brokers()) { + new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), + optimizationOptions.onlyMoveImmigrantReplicas()) + .maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrantOrOfflineReplicas(), + !clusterModel.selfHealingEligibleReplicas().isEmpty() && broker.isAlive()) + .addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)) + .addSelectionFunc(ReplicaSortFunctionFactory.selectLeaders()) + .trackSortedReplicasFor(replicaSortName(this, false, true), broker); + } + + _fixOfflineReplicasOnly = false; + } + + /** + * Check if requirements of this goal are not violated if this proposal is applied to the given cluster state, + * false otherwise. + * + * @param clusterModel The state of the cluster. + * @param action Action containing information about potential modification to the given cluster model. Assumed to be + * of type {@link ActionType#INTER_BROKER_REPLICA_MOVEMENT}. + * @return True if requirements of this goal are not violated if this proposal is applied to the given cluster state, + * false otherwise. + */ + @Override + protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) { + Broker sourceBroker = clusterModel.broker(action.sourceBrokerId()); + // The action must be executed if currently fixing offline replicas only and the offline source replica is proposed + // to be moved to another broker. + if (_fixOfflineReplicasOnly && sourceBroker.replica(action.topicPartition()).isCurrentOffline()) { + return action.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT; + } + + //Check that destination and source would not become unbalanced. + Broker destinationBroker = clusterModel.broker(action.destinationBrokerId()); + String sourceTopic = action.topic(); + + return isLeadershipGoalSatisfiable(sourceTopic, sourceBroker, destinationBroker); + } + + /** + * Update goal state after one round of self-healing / rebalance. + * @param clusterModel The state of the cluster. + * @param optimizationOptions Options to take into account during optimization. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) + throws OptimizationFailureException { + if (!_brokerIdsAboveBalanceUpperLimitByTopic.isEmpty()) { + _brokerIdsAboveBalanceUpperLimitByTopic.clear(); + _succeeded = false; + } + if (!_brokerIdsUnderBalanceLowerLimitByTopic.isEmpty()) { + _brokerIdsUnderBalanceLowerLimitByTopic.clear(); + _succeeded = false; + } + // Sanity check: No self-healing eligible replica should remain at a dead broker/disk. + try { + GoalUtils.ensureNoOfflineReplicas(clusterModel, name()); + } catch (OptimizationFailureException ofe) { + if (_fixOfflineReplicasOnly) { + throw ofe; + } + _fixOfflineReplicasOnly = true; + LOG.info("Ignoring topic replica balance limit to move replicas from dead brokers/disks."); + return; + } + // Sanity check: No replica should be moved to a broker, which used to host any replica of the same partition on its broken disk. + GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name()); + finish(); + } + + private static boolean skipBrokerRebalance(Broker broker, + ClusterModel clusterModel, + Collection leaderReplicas, + boolean requireLessReplicas, + boolean requireMoreReplicas, + boolean hasOfflineTopicReplicas, + boolean moveImmigrantReplicaOnly) { + if (broker.isAlive() && !requireMoreReplicas && !requireLessReplicas) { + LOG.trace("Skip rebalance: Broker {} is already within the limit for replicas {}.", broker, leaderReplicas); + return true; + } else if (!clusterModel.newBrokers().isEmpty() && !broker.isNew() && !requireLessReplicas) { + LOG.trace("Skip rebalance: Cluster has new brokers and this broker {} is not new, but does not require less load " + + "for replicas {}. Hence, it does not have any offline replicas.", broker, leaderReplicas); + return true; + } + boolean hasImmigrantTopicReplicas = leaderReplicas.stream() + .anyMatch(replica -> replica.isLeader() && broker.immigrantReplicas().contains(replica)); + if (!clusterModel.selfHealingEligibleReplicas().isEmpty() && requireLessReplicas + && !hasOfflineTopicReplicas && !hasImmigrantTopicReplicas) { + LOG.trace("Skip rebalance: Cluster is in self-healing mode and the broker {} requires less load, but none of its " + + "current offline or immigrant replicas are from the topic being balanced {}.", broker, leaderReplicas); + return true; + } else if (moveImmigrantReplicaOnly && requireLessReplicas && !hasImmigrantTopicReplicas) { + LOG.trace("Skip rebalance: Only immigrant replicas can be moved, but none of broker {}'s " + + "current immigrant replicas are from the topic being balanced {}.", broker, leaderReplicas); + return true; + } + + return false; + } + + private boolean isTopicExcludedFromRebalance(String topic) { + return _avgTopicLeaderReplicasOnAliveBroker.get(topic) == null; + } + + /** + * Rebalance the given broker without violating the constraints of the current goal and optimized goals. + * + * @param broker Broker to be balanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + * @param optimizationOptions Options to take into account during optimization. + */ + @Override + protected void rebalanceForBroker(Broker broker, + ClusterModel clusterModel, + Set optimizedGoals, + OptimizationOptions optimizationOptions) { + LOG.debug("Rebalancing broker {} [limits] lower: {} upper: {}.", broker.id(), _balanceLowerLimitByTopic, + _balanceUpperLimitByTopic); + + for (String topic : broker.topics()) { + if (isTopicExcludedFromRebalance(topic)) { + continue; + } + + Collection leaderReplicas = leadersOfTopicInBroker(broker, topic); + long numTopicLeaders = leaderReplicas.size(); + int numOfflineTopicLeaders = GoalUtils.retainCurrentOfflineBrokerReplicas(broker, leaderReplicas).size(); + boolean isExcludedForReplicaMove = isExcludedForReplicaMove(broker); + + boolean requireLessLeaders = numOfflineTopicLeaders > 0 || numTopicLeaders > _balanceUpperLimitByTopic.get(topic) + || isExcludedForReplicaMove; + boolean requireMoreLeaders = !isExcludedForReplicaMove && broker.isAlive() + && numTopicLeaders - numOfflineTopicLeaders < _balanceLowerLimitByTopic.get(topic); + + if (skipBrokerRebalance(broker, clusterModel, leaderReplicas, requireLessLeaders, requireMoreLeaders, numOfflineTopicLeaders > 0, + optimizationOptions.onlyMoveImmigrantReplicas())) { + continue; + } + + // Update broker ids over the balance limit for logging purposes. + if (requireLessLeaders && rebalanceByMovingLeadersOut(broker, topic, clusterModel, optimizedGoals, optimizationOptions)) { + _brokerIdsAboveBalanceUpperLimitByTopic.computeIfAbsent(topic, t -> new HashSet<>()).add(broker.id()); + LOG.debug("Failed to sufficiently decrease leaders of topic {} in broker {} with replica movements. Replicas: {}.", + topic, broker.id(), broker.numLeadersFor(topic)); + } + if (requireMoreLeaders && rebalanceByMovingLeadersIn(broker, topic, clusterModel, optimizedGoals, optimizationOptions)) { + _brokerIdsUnderBalanceLowerLimitByTopic.computeIfAbsent(topic, t -> new HashSet<>()).add(broker.id()); + LOG.debug("Failed to sufficiently increase leaders of topic {} in broker {} with replica movements. Replicas: {}.", + topic, broker.id(), broker.numLeadersFor(topic)); + } + if (!_brokerIdsAboveBalanceUpperLimitByTopic.getOrDefault(topic, Collections.emptySet()).contains(broker.id()) + && !_brokerIdsUnderBalanceLowerLimitByTopic.getOrDefault(topic, Collections.emptySet()).contains(broker.id())) { + LOG.debug("Successfully balanced leaders of topic {} in broker {} by moving replicas. Replicas: {}", + topic, broker.id(), broker.numLeadersFor(topic)); + } + } + } + + private Set leadersOfTopicInBroker(Broker broker, String topic) { + return broker.replicasOfTopicInBroker(topic).stream().filter(Replica::isLeader).collect(Collectors.toSet()); + } + + private SortedSet replicasToMoveOut(Broker broker, String topic) { + SortedSet replicasToMoveOut = new TreeSet<>(broker.replicaComparator()); + replicasToMoveOut.addAll(leadersOfTopicInBroker(broker, topic)); + replicasToMoveOut.retainAll(broker.trackedSortedReplicas(replicaSortName(this, false, true)).sortedReplicas(false)); + return replicasToMoveOut; + } + + private boolean rebalanceByMovingLeadersOut(Broker broker, + String topic, + ClusterModel clusterModel, + Set optimizedGoals, + OptimizationOptions optimizationOptions) { + // Get the eligible brokers. + SortedSet candidateBrokers = new TreeSet<>( + Comparator.comparingInt((Broker b) -> b.numLeadersFor(topic)) + .thenComparingInt(b -> b.leaderReplicas().size()) + .thenComparingInt(Broker::id)); + + candidateBrokers.addAll(_fixOfflineReplicasOnly ? clusterModel.aliveBrokers() : clusterModel + .aliveBrokers() + .stream() + .filter(b -> b.numLeadersFor(topic) < _balanceUpperLimitByTopic.get(topic)) + .collect(Collectors.toSet())); + + Collection leadersOfTopicInBroker = leadersOfTopicInBroker(broker, topic); + int numLeadersOfTopicInBroker = leadersOfTopicInBroker.size(); + int numOfflineTopicReplicas = GoalUtils.retainCurrentOfflineBrokerReplicas(broker, leadersOfTopicInBroker).size(); + // If the source broker is excluded for replica move, set its upper limit to 0. + int balanceUpperLimitForSourceBroker = isExcludedForReplicaMove(broker) ? 0 : _balanceUpperLimitByTopic.get(topic); + + boolean wasUnableToMoveOfflineReplica = false; + for (Replica replica : replicasToMoveOut(broker, topic)) { + if (wasUnableToMoveOfflineReplica && !replica.isCurrentOffline() && numLeadersOfTopicInBroker <= balanceUpperLimitForSourceBroker) { + // Was unable to move offline replicas from the broker, and remaining replica count is under the balance limit. + return false; + } + + boolean wasOffline = replica.isCurrentOffline(); + Broker b = maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, ActionType.INTER_BROKER_REPLICA_MOVEMENT, + optimizedGoals, optimizationOptions); + if (b == null) { + b = maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, ActionType.LEADERSHIP_MOVEMENT, + optimizedGoals, optimizationOptions); + } + // Only check if we successfully moved something. + if (b != null) { + if (wasOffline) { + numOfflineTopicReplicas--; + } + if (--numLeadersOfTopicInBroker <= (numOfflineTopicReplicas == 0 ? balanceUpperLimitForSourceBroker : 0)) { + return false; + } + + // Remove and reinsert the broker so the order is correct. + // Because a TreeSet is used here, and lookups are by comparator first, I'm seeing failed deletes + final int brokerId = b.id(); + boolean isRemoved = candidateBrokers.removeIf(cb -> cb.id() == brokerId); + LOG.info("result of removing broker {} from candidateBrokers = {}", b.id(), isRemoved); + if (b.numLeadersFor(topic) < _balanceUpperLimitByTopic.get(topic) || _fixOfflineReplicasOnly) { + candidateBrokers.add(b); + } + } else if (wasOffline) { + wasUnableToMoveOfflineReplica = true; + } + } + // All the topic replicas has been moved away from the broker. + return !leadersOfTopicInBroker(broker, topic).isEmpty(); + } + + private boolean rebalanceByMovingLeadersIn(Broker aliveDestBroker, + String topic, + ClusterModel clusterModel, + Set optimizedGoals, + OptimizationOptions optimizationOptions) { + PriorityQueue eligibleBrokers = new PriorityQueue<>((b1, b2) -> { + // Brokers are sorted by (1) current offline topic leader count then + // (2) all topic leaders count then (3) all leaders count then (4) broker id. + + // B2 Info + Collection leadersOfTopicInB2 = leadersOfTopicInBroker(b2, topic); + int numLeadersOfTopicInB2 = leadersOfTopicInB2.size(); + int numOfflineTopicReplicasInB2 = GoalUtils.retainCurrentOfflineBrokerReplicas(b2, leadersOfTopicInB2).size(); + // B1 Info + Collection leadersOfTopicInB1 = leadersOfTopicInBroker(b1, topic); + int numLeadersOfTopicInB1 = leadersOfTopicInB1.size(); + int numOfflineTopicReplicasInB1 = GoalUtils.retainCurrentOfflineBrokerReplicas(b1, leadersOfTopicInB1).size(); + + int resultByOfflineLeaders = Integer.compare(numOfflineTopicReplicasInB2, numOfflineTopicReplicasInB1); + if (resultByOfflineLeaders == 0) { + int resultByTopicLeaders = Integer.compare(numLeadersOfTopicInB2, numLeadersOfTopicInB1); + if (resultByTopicLeaders == 0) { + int resultByAllLeaders = Integer.compare(b2.leaderReplicas().size(), b1.leaderReplicas().size()); + return resultByAllLeaders == 0 ? Integer.compare(b2.id(), b1.id()) : resultByAllLeaders; + } else { + return resultByTopicLeaders; + } + } + return resultByOfflineLeaders; + }); + + // Source broker can be dead, alive, or may have bad disks. + if (_fixOfflineReplicasOnly) { + clusterModel.brokers().stream().filter(sourceBroker -> sourceBroker.id() != aliveDestBroker.id()) + .forEach(eligibleBrokers::add); + } else { + for (Broker sourceBroker : clusterModel.brokers()) { + if (sourceBroker.numLeadersFor(topic) > _balanceLowerLimitByTopic.get(topic) + || !sourceBroker.currentOfflineReplicas().isEmpty() || isExcludedForReplicaMove(sourceBroker)) { + eligibleBrokers.add(sourceBroker); + } + } + } + + Collection leadersOfTopicInBroker = leadersOfTopicInBroker(aliveDestBroker, topic); + int numLeadersOfTopicInBroker = leadersOfTopicInBroker.size(); + + Set candidateBrokers = Collections.singleton(aliveDestBroker); + + // Stop when no topic replicas can be moved in anymore. + while (!eligibleBrokers.isEmpty()) { + Broker sourceBroker = eligibleBrokers.poll(); + SortedSet replicasToMove = replicasToMoveOut(sourceBroker, topic); + int numOfflineTopicReplicas = GoalUtils.retainCurrentOfflineBrokerReplicas(sourceBroker, replicasToMove).size(); + + for (Replica replica : replicasToMove) { + boolean wasOffline = replica.isCurrentOffline(); + ActionType action = (aliveDestBroker.replica(replica.topicPartition()) == null) + ? ActionType.INTER_BROKER_REPLICA_MOVEMENT : ActionType.LEADERSHIP_MOVEMENT; + Broker b = maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, action, + optimizedGoals, optimizationOptions); + // Only need to check status if the action is taken. This will also handle the case that the source broker + // has nothing to move in. In that case we will never reenqueue that source broker. + if (b != null) { + if (wasOffline) { + numOfflineTopicReplicas--; + } + if (++numLeadersOfTopicInBroker >= _balanceLowerLimitByTopic.get(topic)) { + // Note that the broker passed to this method is always alive; hence, there is no need to check if it is dead. + return false; + } + // If the source broker has no offline replicas and a lower number of topic replicas than the next broker in + // the eligible broker in the queue, we reenqueue the source broker and switch to the next broker. + if (!eligibleBrokers.isEmpty() && numOfflineTopicReplicas == 0 + && sourceBroker.numLeadersFor(topic) < eligibleBrokers.peek().numLeadersFor(topic)) { + eligibleBrokers.add(sourceBroker); + break; + } + } + } + } + return true; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java index 99c94230c2..7778a616f3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/AnalyzerConfig.java @@ -26,6 +26,7 @@ import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal; import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal; import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal; import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal; import com.linkedin.kafka.cruisecontrol.analyzer.goals.rackaware.NoOpRackAwareGoalRackIdMapper; import com.linkedin.kafka.cruisecontrol.analyzer.goals.rackaware.RackAwareGoalRackIdMapper; @@ -105,6 +106,35 @@ public final class AnalyzerConfig { + "leader replica distribution. For example, 1.10 means the highest leader replica count of a broker should not be " + "above 1.10x of average leader replica count of all alive brokers."; + /** + * topic.leader.replica.count.balance.threshold + */ + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG = "topic.leader.replica.count.balance.threshold"; + public static final double DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD = 1.10; + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_DOC = "The maximum allowed extent of unbalance for " + + "leader replica distribution from each topic. For example, 1.80 means the highest topic leader replica count of a broker " + + "should not be above 1.80x of average leader replica count of all brokers for the same topic."; + + /** + * topic.leader.replica.count.balance.min.gap + */ + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG = "topic.leader.replica.count.balance.min.gap"; + public static final int DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP = 2; + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_DOC = "The minimum allowed gap between a balance limit and" + + " the average leader replica count for each topic. A balance limit is set via topic.leader.replica.count.balance.threshold config." + + " If the difference between the computed limit and the average leader replica count for the relevant topic is smaller than" + + " the value specified by this config, the limit is adjusted accordingly."; + + /** + * topic.leader.replica.count.balance.max.gap + */ + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG = "topic.leader.replica.count.balance.max.gap"; + public static final int DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP = 10; + public static final String TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_DOC = "The maximum allowed gap between a balance limit and" + + " the average leader replica count for each topic. A balance limit is set via topic.leader.replica.count.balance.threshold config." + + " If the difference between the computed limit and the average leader replica count for the relevant topic is greater than" + + " the value specified by this config, the limit is adjusted accordingly."; + /** * topic.replica.count.balance.threshold */ @@ -262,6 +292,7 @@ public final class AnalyzerConfig { .add(RackAwareDistributionGoal.class.getName()) .add(MinTopicLeadersPerBrokerGoal.class.getName()) .add(ReplicaCapacityGoal.class.getName()) + .add(TopicLeaderReplicaDistributionGoal.class.getName()) .add(DiskCapacityGoal.class.getName()) .add(NetworkInboundCapacityGoal.class.getName()) .add(NetworkOutboundCapacityGoal.class.getName()) @@ -524,6 +555,24 @@ public static ConfigDef define(ConfigDef configDef) { atLeast(1), ConfigDef.Importance.MEDIUM, TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_DOC) + .define(TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, + ConfigDef.Type.DOUBLE, + DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD, + atLeast(1), + ConfigDef.Importance.HIGH, + TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_DOC) + .define(TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, + ConfigDef.Type.INT, + DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP, + atLeast(0), + ConfigDef.Importance.MEDIUM, + TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_DOC) + .define(TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, + ConfigDef.Type.INT, + DEFAULT_TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP, + atLeast(0), + ConfigDef.Importance.MEDIUM, + TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_DOC) .define(CPU_CAPACITY_THRESHOLD_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_CPU_CAPACITY_THRESHOLD, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/Broker.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/Broker.java index 73cfe34b59..292d581cf4 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/Broker.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/Broker.java @@ -650,8 +650,8 @@ public void writeTo(OutputStream out) throws IOException { @Override public String toString() { - return String.format("Broker[id=%d,rack=%s,state=%s,replicaCount=%d,logdirs=%s]", - _id, rack().id(), _state, _replicas.size(), _diskByLogdir.keySet()); + return String.format("Broker[id=%d,rack=%s,state=%s,replicaCount=%d,logdirs=%s,leaderCount=%d]", + _id, rack().id(), _state, _replicas.size(), _diskByLogdir.keySet(), _leaderReplicas.size()); } /** diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/TopicLeaderReplicaDistributionGoalTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/TopicLeaderReplicaDistributionGoalTest.java new file mode 100644 index 0000000000..7a90a1056c --- /dev/null +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/TopicLeaderReplicaDistributionGoalTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.analyzer; + +import com.codahale.metrics.MetricRegistry; +import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal; +import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal; +import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress; +import com.linkedin.kafka.cruisecontrol.common.TestConstants; +import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig; +import com.linkedin.kafka.cruisecontrol.executor.Executor; +import com.linkedin.kafka.cruisecontrol.model.ClusterModel; +import com.linkedin.kafka.cruisecontrol.model.Broker; +import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.SystemTime; +import org.easymock.EasyMock; +import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils.getAggregatedMetricValues; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TopicLeaderReplicaDistributionGoalTest { + + private ClusterModel makeSimpleClusterModel(int numBrokers, BiFunction leaderReplicaAssigner) { + ClusterModel clusterModel = new ClusterModel(new ModelGeneration(0, 0L), 1.0); + BrokerCapacityInfo brokerCapacity = new BrokerCapacityInfo(TestConstants.BROKER_CAPACITY, ""); + Map topicPartitionCnt = new HashMap<>(); + Function rackIdGetter = bidx -> "rack" + (bidx % 3); + for (int bidx = 0; bidx < numBrokers; bidx++) { + String rackId = rackIdGetter.apply(bidx); + clusterModel.createRack(rackId); + clusterModel.createBroker(rackId, "broker" + bidx, bidx, brokerCapacity, false); + } + for (int bidx = 0; bidx < numBrokers; bidx++) { + Broker broker = clusterModel.broker(bidx); + for (int tidx = 0; tidx < 2; tidx++) { + String topic = "T" + tidx; + final int numPartitions = leaderReplicaAssigner.apply(bidx, tidx); + for (int pidx = 0; pidx < numPartitions; pidx++) { + int offset = topicPartitionCnt.computeIfAbsent(topic, k -> new AtomicInteger()).getAndIncrement(); + TopicPartition tp = new TopicPartition(topic, offset); + clusterModel.createReplica(broker.rack().id(), broker.id(), tp, 0, true); + AggregatedMetricValues aggregatedMetricValues = getAggregatedMetricValues(1.0, 10.0, 13.0, 5.0); + clusterModel.setReplicaLoad(broker.rack().id(), broker.id(), tp, aggregatedMetricValues, Collections.singletonList(3L)); + final int nextBrokerId = (broker.id() + 1) % numBrokers; + final String followerRackId = rackIdGetter.apply(nextBrokerId); + clusterModel.createReplica(followerRackId, nextBrokerId, tp, 1, false); + clusterModel.setReplicaLoad(followerRackId, nextBrokerId, tp, aggregatedMetricValues, Collections.singletonList(3L)); + } + } + } + return clusterModel; + } + + @Test + public void testGoalNoopOnSatisfiable() throws Exception { + final ClusterModel clusterModel = makeSimpleClusterModel(6, (bidx, tid) -> 2); + final OptimizerResult result = getOptimizerResult(clusterModel); + assertTrue(result.violatedGoalsBeforeOptimization().isEmpty()); + assertTrue(result.violatedGoalsAfterOptimization().isEmpty()); + for (Broker b : clusterModel.brokers()) { + assertEquals(4, b.leaderReplicas().size()); + } + } + + @Test + public void testGoalLinearLeaderGrowth() throws Exception { + final ClusterModel clusterModel = makeSimpleClusterModel(6, (bidx, tidx) -> 2 * bidx); + final OptimizerResult result = getOptimizerResult(clusterModel); + + assertFalse(result.violatedGoalsBeforeOptimization().isEmpty()); + assertTrue(result.violatedGoalsAfterOptimization().isEmpty()); + for (Broker b : clusterModel.brokers()) { + assertEquals(10, b.leaderReplicas().size()); + } + } + + @Test + public void testGoalPreferBrokerWithHigherTotalLeaderOnEquality() throws Exception { + List topicLeaderAssignment = new ArrayList<>(); + topicLeaderAssignment.add(new int[]{6, 6, 4}); + topicLeaderAssignment.add(new int[]{4, 5, 5}); + BiFunction replicaAssigner = (brokerId, topicId) -> { + if (topicId >= topicLeaderAssignment.size()) { + return 5; + } else if (brokerId >= topicLeaderAssignment.get(topicId).length) { + return 5; + } else { + return topicLeaderAssignment.get(topicId)[brokerId]; + } + }; + final ClusterModel clusterModel = makeSimpleClusterModel(6, replicaAssigner); + final OptimizerResult result = getOptimizerResult(clusterModel); + + assertFalse(result.violatedGoalsBeforeOptimization().isEmpty()); + assertTrue(result.violatedGoalsAfterOptimization().isEmpty()); + for (Broker b : clusterModel.brokers()) { + assertEquals(10, b.leaderReplicas().size()); + } + } + + private static OptimizerResult getOptimizerResult(ClusterModel clusterModel) throws Exception { + Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + props.setProperty(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, "0"); + props.setProperty(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, "0"); + KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props); + BalancingConstraint balancingConstraint = new BalancingConstraint(kafkaCruiseControlConfig); + GoalOptimizer goalOptimizer = new GoalOptimizer(kafkaCruiseControlConfig, + null, + new SystemTime(), + new MetricRegistry(), + EasyMock.mock(Executor.class), + EasyMock.mock(AdminClient.class)); + List goals = Collections.singletonList(new TopicLeaderReplicaDistributionGoal(balancingConstraint)); + return goalOptimizer.optimizations(clusterModel, goals, new OperationProgress()); + } +}