Skip to content

Conversation

dongjinleekr
Copy link
Contributor

This PR resolves #1851.

@dongjinleekr
Copy link
Contributor Author

This PR is an WIP; I will test this feature in our in-house fork (@naver) this week. Stay tuned!! 🙃

@dongjinleekr
Copy link
Contributor Author

@aswinayyolath @mhratson @jiao-zhangS Could you kindly have a look? We adopted this patch to our in-house distribution and confirmed it works correctly. 🙏

Copy link

@cpaika cpaika left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make things a lot more stable for JBOD clusters, good change

inExecutionTasks = inExecutionTasks();
}

if (_logDirThrottle != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, why the underscore in the variable names here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In consistency with _replicationThrottle.

@dongjinleekr
Copy link
Contributor Author

@cpaika @adkafka

Excuse me. Is there any issue with this PR? It initially worked on 2.5.138, and 2.5.141 was already released last month, but this PR has yet to be merged or released.

If any modifications are needed, don't hesitate to leave me a mention.

@adkafka
Copy link

adkafka commented Nov 18, 2024

@cpaika @adkafka

Excuse me. Is there any issue with this PR? It initially worked on 2.5.138, and 2.5.141 was already released last month, but this PR has yet to be merged or released.

If any modifications are needed, don't hesitate to leave me a mention.

I don't see any issues with this PR, but I'm not a maintainer of this project. We need a maintainer to review this and add their approval.

@aswinayyolath
Copy link
Contributor

@CCisGG could you please review this PR?

@dongjinleekr
Copy link
Contributor Author

@CCisGG Hello. Could you please have a look when you are free? 🙏

@CCisGG
Copy link
Contributor

CCisGG commented Dec 8, 2024

Hi @mhratson, would you mind taking a look at this one? Thanks!

@imans777
Copy link

imans777 commented Jan 4, 2025

Anyone caring to review this?
It's really a must-have feature.

Copy link
Contributor

@CCisGG CCisGG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments/questions.

Some other questions I have:

  1. Did you test that with your change, when no replication/intra broker throttling was set (which I think is by default), does this work properly?
  2. Is their any case that this intra-broker throttling can interfere with inter-broker thorttling? E.g. what happens if they are enabled together?
  3. Please update Configuration.md for this new config.

@dongjinleekr
Copy link
Contributor Author

@CCisGG

Sorry for being late. Here are the updates, rebased onto the latest main, and the anwsers:

Did you test that with your change, when no replication/intra broker throttling was set (which I think is by default), does this work properly?

YES. I tested it before I opened the PR, and we have been running it with our in-house distribution for more than 6 months and experienced no trouble. (As you already pointed out, default.log.dir.throttle is null by default.)

Is their any case that this intra-broker throttling can interfere with inter-broker thorttling? E.g. what happens if they are enabled together?

As far as I know, there is no case where intra and inter-broker throttlings interfere with each other. The reason is simple: the intra-broker rebalancing is only executed when rebalance_disk parameter is set to true. If this parameter is set to false (default), only inter-broker rebalancing is executed.

In short, intra- and inter-broker rebalancings are executed exclusively for each other, so if default.replication.throttle and default.log.dir.throttle are enabled together, they will not impact each other.

@luciano-sabenca
Copy link

Any news about this MR?
It's a really useful feature for us!
:D

Copy link
Contributor

@CCisGG CCisGG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I left some minor comments.

- Add: setLogDirThrottles, setLogDirThrottledRateIfNecessary
- Rename: removeReplicationThrottledRateFromBroker → removeThrottledRatesFromBroker
…oker rebalanacing

    - Rename: ReplicationThrottleHelper.clearThrottles → clearInterBrokerThrottles
    - Add ReplicationThrottleHelper.clearIntraBrokerThrottles
    - Executor.intraBrokerMoveReplicas now calls ReplicationThrottleHelper.setLogDirThrottles, ReplicationThrottleHelper.clearIntraBrokerThrottles
…, and UpdateTopicConfigurationRunnable. (reason: they do not incur any intra-broker replica movements)
@dongjinleekr
Copy link
Contributor Author

@CCisGG Rebased onto the latest main, with some additional improvements.


Let's create a task to track the split. Thanks!

You mean splitting the Executor#startExecution method call to inter-broker mode and intra-broker mode, applying replicationThrottle and logDirThrottle respectively, right? I tried to, but it seems impossible. Here is why:

The Executor#executeProposals method, which calls Executor#startExecution, is agnostic to the inter-broker and intra-broker modes.

(Note: The method call chain is KafkaCruiseControl#executeProposalsExecutor#executeProposalsExecutor#startExecution)

The rebalance_disk parameter (ParameterUtils.REBALANCE_DISK_MODE_PARAM) is only used to generate the proposal, and then the proposal is delivered to the Executor#executeProposals method to be executed. That is, the Executor#executeProposals method can not discriminate between the two modes.

Then, is splitting KafkaCruiseControl#executeProposals into inter- and intra-broker modes feasible? I also investigated the possibility, but it seems impossible. Here is why: it is called by the following 5 API calls/Runnables:

  1. add_broker (AddBrokersRunnable)
  2. fix_offline_replicas (FixOfflineReplicasRunnable)
  3. rebalance (RebalanceRunnable)
  4. RemoveDisksRunnable
  5. UpdateTopicConfigurationRunnable

5 incurs no movements, 1 incurs inter-broker movements only, 3 incurs either inter-broker or intra-broker movements, and 2 and 4 incur BOTH. For this reason, splitting the two methods' calls seems unreasonable - we can not assert whether 2 or 4 invocations need intra-broker movements beforehand.

For the reasons above, I did nothing to the call chain. It is true that these methods seem too overloaded, but improving them with another PR may be much better IMHO. (I have an idea, and will open another PR on this issue after this PR is merged.)

@dongjinleekr
Copy link
Contributor Author

@CCisGG Instead, I did:

  1. removed logDirThrottle from the Executor#executeRemoval method.
    Since its goal is to move all replicas in the specified broker, intra-broker replica movement does not occur at all. So, it is totally useless.
  2. Remove _logDirThrottle from the AddBrokersRunnable, RemoveBrokersRunnable, and UpdateTopicConfigurationRunnable.
    DITTO. These Runnables do not incur any intra-broker replica movements. So, it is also useless.

With one commit for Configurations.md, one commit for KafkaCruiseControl Javadoc, and two commits for the improvements above. In total, I added four commits after your last review. 🙏 Please have a look when you are free!! Thanks.

@CCisGG
Copy link
Contributor

CCisGG commented Apr 2, 2025

@dongjinleekr Thanks for the explanations and the new commits. I think they are all reasonable.

Since I probably won't have much time to do another round of review and audit, would you mind inviting some other person (maybe from your org) to review the PR? And Let's do a final round of testing to ensure this won't interfere with inter-broker rebalance. Once those are done, I can help to merge to PR. Let me know if this works. Thank you!

Copy link
Contributor

@CCisGG CCisGG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general to me. Please ask for another review from a different person if possible. Then I can help to merge the PR.

@dongjinleekr
Copy link
Contributor Author

@CCisGG

Would you mind inviting some other person (maybe from your org) to review the PR?

Oh no, no problem. In fact, I am now working on supporting this feature in the Strimzi Kafka operator, and it will be completed by this weekend. As soon as I open the PR, there may be other people who can review it. (Sadly, I am the only person who is working on CC internals in my org 😢)

Plus: We are running this feature in our in-house fork but, let me double-check if there is any problem 🙏 Thank you!

Copy link
Contributor

@kyguy kyguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjinleekr, the PR looks great overall! I just left a few minor suggestions, mostly related to refactoring for consistency and readability.

LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
Set<Integer> participatingBrokers = Sets.newHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use new HashSet<>() from java.util instead?

It appears that Sets.newHashSet() is deprecated and uses java.util.HashSet under the hood anyway.

}

void setThrottles(List<ExecutionProposal> replicaMovementProposals)
void setReplicationThrottles(List<ExecutionProposal> replicaMovementProposals, long throttleRate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we refactor setThrottles() -> setReplicationThrottles() but then refactor clearThrottles() -> clearInterBrokerThrottles() instead of clearReplicationThrottles(). To improve readability, could we use a consistent naming scheme for the new and refactored method names in this class? For example, use:

(a) setReplicationThrottles() and clearReplicationThrottles()

or

(b) setInterBrokerThrottles() and clearInterBrokerThrottles()

Option (b) seems better aligned with Cruise Control terminology but either would be fine so long as it is consistent!

}
}

Set<Integer> setLogDirThrottles(List<ExecutionProposal> replicaMovementProposals, long throttleRate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as above

Comment on lines +206 to +215
List<AlterConfigOp> ops = new ArrayList<>();
ConfigEntry currThrottleRate = brokerConfigs.get(LOG_DIR_THROTTLED_RATE);
if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(throttleRate))) {
LOG.debug("Setting {} to {} bytes/second for broker {}", LOG_DIR_THROTTLED_RATE, throttleRate, brokerId);
ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET));
}
if (!ops.isEmpty()) {
changeBrokerConfigs(brokerId, ops);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there will only ever be one "ops", we could refactor to something like this:

Suggested change
List<AlterConfigOp> ops = new ArrayList<>();
ConfigEntry currThrottleRate = brokerConfigs.get(LOG_DIR_THROTTLED_RATE);
if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(throttleRate))) {
LOG.debug("Setting {} to {} bytes/second for broker {}", LOG_DIR_THROTTLED_RATE, throttleRate, brokerId);
ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET));
}
if (!ops.isEmpty()) {
changeBrokerConfigs(brokerId, ops);
}
}
ConfigEntry currThrottleRate = brokerConfigs.get(LOG_DIR_THROTTLED_RATE);
if (currThrottleRate == null || !currThrottleRate.value().equals(String.valueOf(throttleRate))) {
LOG.debug("Setting {} to {} bytes/second for broker {}", LOG_DIR_THROTTLED_RATE, throttleRate, brokerId);
changeBrokerConfigs(brokerId, List.of(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET)));
}
}

Comment on lines +387 to +398
List<AlterConfigOp> ops = new ArrayList<>();
if (currLogDirThrottle != null) {
if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) {
LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle);
} else {
LOG.debug("Removing log dir throttle rate: {} on broker {}", currLogDirThrottle, brokerId);
ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE));
}
}
if (!ops.isEmpty()) {
changeBrokerConfigs(brokerId, ops);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as above, since there is only ever one "ops", this could be refactored like the following:

Suggested change
List<AlterConfigOp> ops = new ArrayList<>();
if (currLogDirThrottle != null) {
if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) {
LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle);
} else {
LOG.debug("Removing log dir throttle rate: {} on broker {}", currLogDirThrottle, brokerId);
ops.add(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, null), AlterConfigOp.OpType.DELETE));
}
}
if (!ops.isEmpty()) {
changeBrokerConfigs(brokerId, ops);
}
if (currLogDirThrottle != null) {
if (currLogDirThrottle.source().equals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) {
LOG.debug("Skipping removal for static log dir throttle rate: {}", currLogDirThrottle);
} else {
LOG.debug("Removing log dir throttle rate: {} on broker {}", currLogDirThrottle, brokerId);
changeBrokerConfigs(brokerId, List.of(new AlterConfigOp(new ConfigEntry(LOG_DIR_THROTTLED_RATE, String.valueOf(throttleRate)), AlterConfigOp.OpType.SET)));
}
}

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use new HashSet<>() from java.util here instead?

Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add logDirThrottle to the executeRemoval() method as well? This way we could clear the throttles from the broker metadata when brokers are removed from the cluster.

parameterConfigOverrides.put(MAINTENANCE_EVENT_TYPE_CONFIG, eventType);
// Expect mocks.
EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).times(4);
EasyMock.expect(_mockKafkaCruiseControl.config()).andReturn(_config).anyTimes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason we changed this to anyTimes()?

0, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency,
_executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, _isTriggeredByUserRequest, _uuid,
SKIP_AUTO_REFRESHING_CONCURRENCY);
_executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logDirThrottle is null here because partitions are never any intra-broker partition movements as part of topic configuration changes, is my understanding correct?

_executionProgressCheckIntervalMs,
_replicaMovementStrategy,
_replicationThrottle,
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the logDirThrottle argument set to null here? Wouldn't we want new brokers to have the configured logDirThrottle set?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

disk balance
8 participants