Skip to content

Commit 5aff6bd

Browse files
committed
[FLINK-38338][runtime] Introduce the abstraction to describe a rescale event.
1 parent 4b7d62b commit 5aff6bd

23 files changed

+1702
-29
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class SlotSharingGroup implements java.io.Serializable {
4141

4242
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
4343

44+
private String slotSharingGroupName;
45+
4446
// Represents resources of all tasks in the group. Default to be UNKNOWN.
4547
private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
4648

@@ -70,6 +72,14 @@ public ResourceProfile getResourceProfile() {
7072
return resourceProfile;
7173
}
7274

75+
public String getSlotSharingGroupName() {
76+
return slotSharingGroupName;
77+
}
78+
79+
public void setSlotSharingGroupName(String slotSharingGroupName) {
80+
this.slotSharingGroupName = slotSharingGroupName;
81+
}
82+
7383
// ------------------------------------------------------------------------
7484
// Utilities
7585
// ------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425

2526
import org.slf4j.Logger;
2627

@@ -34,13 +35,21 @@ class Finished implements State {
3435

3536
private final Logger logger;
3637

38+
private final Durable durable;
39+
3740
Finished(Context context, ArchivedExecutionGraph archivedExecutionGraph, Logger logger) {
3841
this.archivedExecutionGraph = archivedExecutionGraph;
3942
this.logger = logger;
43+
this.durable = new Durable();
4044

4145
context.onFinished(archivedExecutionGraph);
4246
}
4347

48+
@Override
49+
public Durable getDurable() {
50+
return durable;
51+
}
52+
4453
@Override
4554
public void cancel() {}
4655

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.runtime.scheduler.adaptive;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
2021
import org.apache.flink.api.common.JobID;
2122
import org.apache.flink.runtime.jobgraph.JobGraph;
2223
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -38,7 +39,7 @@ public class JobGraphJobInformation implements JobInformation {
3839
private final JobGraph jobGraph;
3940
private final JobID jobID;
4041
private final String name;
41-
private final VertexParallelismStore vertexParallelismStore;
42+
protected final VertexParallelismStore vertexParallelismStore;
4243

4344
public JobGraphJobInformation(
4445
JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
@@ -78,22 +79,29 @@ public Iterable<JobInformation.VertexInformation> getVertices() {
7879
jobGraph.getVertices(), (vertex) -> getVertexInformation(vertex.getID()));
7980
}
8081

82+
@Override
83+
public String getVertexName(JobVertexID jobVertexID) {
84+
return jobGraph.findVertexByID(jobVertexID).getName();
85+
}
86+
8187
/** Returns a copy of a jobGraph that can be mutated. */
8288
public JobGraph copyJobGraph() {
8389
return InstantiationUtil.cloneUnchecked(jobGraph);
8490
}
8591

92+
@Override
8693
public VertexParallelismStore getVertexParallelismStore() {
8794
return vertexParallelismStore;
8895
}
8996

90-
private static final class JobVertexInformation implements JobInformation.VertexInformation {
97+
@VisibleForTesting
98+
public static final class JobVertexInformation implements JobInformation.VertexInformation {
9199

92100
private final JobVertex jobVertex;
93101

94102
private final VertexParallelismInformation parallelismInfo;
95103

96-
private JobVertexInformation(
104+
public JobVertexInformation(
97105
JobVertex jobVertex, VertexParallelismInformation parallelismInfo) {
98106
this.jobVertex = jobVertex;
99107
this.parallelismInfo = parallelismInfo;
@@ -123,5 +131,10 @@ public int getMaxParallelism() {
123131
public SlotSharingGroup getSlotSharingGroup() {
124132
return jobVertex.getSlotSharingGroup();
125133
}
134+
135+
@VisibleForTesting
136+
public VertexParallelismInformation getVertexParallelismInfo() {
137+
return parallelismInfo;
138+
}
126139
}
127140
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,37 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425
import org.apache.flink.util.function.FunctionWithException;
2526
import org.apache.flink.util.function.ThrowingConsumer;
2627

2728
import org.slf4j.Logger;
2829

30+
import java.time.Instant;
2931
import java.util.Optional;
3032
import java.util.function.Consumer;
3133

3234
/**
3335
* State abstraction of the {@link AdaptiveScheduler}. This interface contains all methods every
3436
* state implementation must support.
3537
*/
36-
interface State extends LabeledGlobalFailureHandler {
38+
public interface State extends LabeledGlobalFailureHandler {
39+
40+
/**
41+
* Get the durable time information of the current state.
42+
*
43+
* @return The durable time information of the current state.
44+
*/
45+
Durable getDurable();
3746

3847
/**
3948
* This method is called whenever one transitions out of this state.
4049
*
4150
* @param newState newState is the state into which the scheduler transitions
4251
*/
43-
default void onLeave(Class<? extends State> newState) {}
52+
default void onLeave(Class<? extends State> newState) {
53+
getDurable().setOutTimestamp(Instant.now().toEpochMilli());
54+
}
4455

4556
/** Cancels the job execution. */
4657
void cancel();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.flink.runtime.scheduler.KvStateHandler;
5656
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
5757
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
58+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
5859
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
5960
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
6061
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
@@ -69,6 +70,7 @@
6970

7071
import java.io.IOException;
7172
import java.net.InetSocketAddress;
73+
import java.time.Instant;
7274
import java.util.ArrayList;
7375
import java.util.List;
7476
import java.util.Map;
@@ -101,6 +103,8 @@ abstract class StateWithExecutionGraph implements State {
101103

102104
private final VertexEndOfDataListener vertexEndOfDataListener;
103105

106+
private final Durable durable;
107+
104108
StateWithExecutionGraph(
105109
Context context,
106110
ExecutionGraph executionGraph,
@@ -118,6 +122,7 @@ abstract class StateWithExecutionGraph implements State {
118122
this.userCodeClassLoader = userClassCodeLoader;
119123
this.failureCollection = new ArrayList<>(failureCollection);
120124
this.vertexEndOfDataListener = new VertexEndOfDataListener(executionGraph);
125+
this.durable = new Durable();
121126

122127
FutureUtils.assertNoException(
123128
executionGraph
@@ -137,6 +142,11 @@ abstract class StateWithExecutionGraph implements State {
137142
context.getMainThreadExecutor()));
138143
}
139144

145+
@Override
146+
public Durable getDurable() {
147+
return durable;
148+
}
149+
140150
ExecutionGraph getExecutionGraph() {
141151
return executionGraph;
142152
}
@@ -156,6 +166,7 @@ protected ExecutionGraphHandler getExecutionGraphHandler() {
156166

157167
@Override
158168
public void onLeave(Class<? extends State> newState) {
169+
getDurable().setOutTimestamp(Instant.now().toEpochMilli());
159170
if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
160171
// we are leaving the StateWithExecutionGraph --> we need to dispose temporary services
161172
operatorCoordinatorHandler.disposeAllOperatorCoordinators();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
24+
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
2425

2526
import org.slf4j.Logger;
2627

@@ -39,9 +40,17 @@ abstract class StateWithoutExecutionGraph implements State {
3940

4041
private final Logger logger;
4142

43+
private final Durable durable;
44+
4245
StateWithoutExecutionGraph(Context context, Logger logger) {
4346
this.context = context;
4447
this.logger = logger;
48+
this.durable = new Durable();
49+
}
50+
51+
@Override
52+
public Durable getDurable() {
53+
return durable;
4554
}
4655

4756
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.runtime.scheduler.adaptive.allocator;
2020

21-
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2221
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2322
import org.apache.flink.runtime.jobmaster.SlotInfo;
2423
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -38,13 +37,13 @@ class AllocatorUtil {
3837

3938
private AllocatorUtil() {}
4039

41-
static Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
40+
static Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
4241
getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
4342
return SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
4443
}
4544

4645
static int getMinimumRequiredSlots(
47-
Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
46+
Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
4847
slotSharingGroupMetaInfos) {
4948
return slotSharingGroupMetaInfos.values().stream()
5049
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
@@ -78,7 +77,10 @@ static void checkMinimumRequiredSlots(
7877
}
7978
});
8079
return sharedSlotToVertexAssignment.values().stream()
81-
.map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
80+
.map(
81+
ids ->
82+
new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
83+
slotSharingGroup, ids))
8284
.collect(Collectors.toList());
8385
}
8486
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
2121
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
22+
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
2223

2324
import java.util.Collection;
2425

@@ -38,6 +39,14 @@ public interface JobInformation {
3839

3940
Iterable<VertexInformation> getVertices();
4041

42+
default String getVertexName(JobVertexID jobVertexID) {
43+
throw new UnsupportedOperationException();
44+
}
45+
46+
default VertexParallelismStore getVertexParallelismStore() {
47+
throw new UnsupportedOperationException();
48+
}
49+
4150
/** Information about a single vertex. */
4251
interface VertexInformation {
4352
JobVertexID getJobVertexID();

0 commit comments

Comments
 (0)