Skip to content

Conversation

RocMarshal
Copy link
Contributor

What is the purpose of the change

Introduce the abstraction to describe a rescale event.

Brief change log

Introduce the abstraction to describe a rescale event.

Verifying this change

This change added tests and can be verified as follows:

  • org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 12, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

SlotSharingGroupId sharingGroupId = sharingGroup.getSlotSharingGroupId();
SlotSharingGroupRescale sharingGroupRescaleInfo =
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
sharingGroupRescaleInfo.setSlotSharingGroupMetaInfo(sharingGroup);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include this sharing group name on the constructor rather than via a setter? Or is there a need to add it post construction.

}

public void setSlotSharingGroupName(String slotSharingGroupName) {
this.slotSharingGroupName = slotSharingGroupName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the slotSharingGroupName; to the toString(). We should have a hashcode for this object in including slotSharingGroupName.


@Override
public Durable getDurable() {
return durable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just have:

public Durable getDurable() {
        return new Durable();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @davidradl for your comments.

Please let me have a try on clarifying it:
In the current scheduler’s terminal state, this duration is used to represent the start and end time of the scheduler state.
Therefore, generally speaking, it is reasonable to record the start time at the moment when the state is created, that is, when creating the Durable object.


import java.time.Instant;

/** Durable to record enter timestamp and leave timestamp. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comments say enter and leave but the code says in and out.

import java.util.Set;
import java.util.stream.Collectors;

/** Rescale event. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you add more detail in the java doc for this new class, around its content and lifecycle.

import java.io.Serializable;
import java.util.Objects;

/** Utils class to record the information of a scheduler state. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what the Scan means in the class name - we should explain that in the javadoc fro the class I thinkj

@Override
public int hashCode() {
return Objects.hash(
slotSharingGroupId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the order i differed from the equals to is trickier to check they are the same. It is up to you if you want to change, but wanted to make you aware of this .

/** It represents the target rescale event is failed due to some exceptions. */
FAILED,
/**
* It represents the target rescale event is ignored by some new conditions that could trigger a
Copy link
Contributor

@davidradl davidradl Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say some new conditions , does this mean or a subset of new conditions- if so we should indicate which conditions. If this applies to all conditions then we should remove the word some

public enum TerminalState {
/** It represents the target rescale event is completed successfully. */
COMPLETED,
/** It represents the target rescale event is failed due to some exceptions. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say some exceptions , does this mean or a subset of exceptions- if so we should indicate which exceptions. If this applies to all exceptions then we should remove the word some


/** The enum to represent the reason why a rescale event is terminated. */
public enum TerminatedReason {
SUCCEEDED(TerminalState.COMPLETED),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a reason I would have expected a readable string describing the state in more detail, to be also on the enum.


/** The cause of trigger rescaling. */
public enum TriggerCause {
/** The first scheduling when the job starting. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the English is not great. Maybe The first schedule of the job starting.

/** The cause of trigger rescaling. */
public enum TriggerCause {
/** The first scheduling when the job starting. */
INITIAL_SCHEDULE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have the comments as the description of the enum, so the enums have readable descriptions as well as names.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Sep 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants