Skip to content

Commit df34882

Browse files
committed
Address the david's comments.
1 parent f7cfdcd commit df34882

File tree

14 files changed

+286
-123
lines changed

14 files changed

+286
-123
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
2424

2525
import java.util.Collections;
26+
import java.util.Objects;
2627
import java.util.Set;
2728
import java.util.TreeSet;
2829

@@ -84,8 +85,23 @@ public void setSlotSharingGroupName(String slotSharingGroupName) {
8485
// Utilities
8586
// ------------------------------------------------------------------------
8687

88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(ids, slotSharingGroupId, slotSharingGroupName, resourceProfile);
91+
}
92+
8793
@Override
8894
public String toString() {
89-
return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + resourceProfile + '}';
95+
return "SlotSharingGroup{"
96+
+ "ids="
97+
+ ids
98+
+ ", slotSharingGroupId="
99+
+ slotSharingGroupId
100+
+ ", slotSharingGroupName='"
101+
+ slotSharingGroupName
102+
+ '\''
103+
+ ", resourceProfile="
104+
+ resourceProfile
105+
+ '}';
90106
}
91107
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@ public Iterable<JobInformation.VertexInformation> getVertices() {
7979
jobGraph.getVertices(), (vertex) -> getVertexInformation(vertex.getID()));
8080
}
8181

82-
@Override
83-
public String getVertexName(JobVertexID jobVertexID) {
84-
return jobGraph.findVertexByID(jobVertexID).getName();
85-
}
86-
8782
/** Returns a copy of a jobGraph that can be mutated. */
8883
public JobGraph copyJobGraph() {
8984
return InstantiationUtil.cloneUnchecked(jobGraph);
@@ -112,6 +107,11 @@ public JobVertexID getJobVertexID() {
112107
return jobVertex.getID();
113108
}
114109

110+
@Override
111+
public String getVertexName() {
112+
return jobVertex.getName();
113+
}
114+
115115
@Override
116116
public int getMinParallelism() {
117117
return parallelismInfo.getMinParallelism();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ public interface JobInformation {
3939

4040
Iterable<VertexInformation> getVertices();
4141

42-
default String getVertexName(JobVertexID jobVertexID) {
43-
throw new UnsupportedOperationException();
44-
}
45-
4642
default VertexParallelismStore getVertexParallelismStore() {
4743
throw new UnsupportedOperationException();
4844
}
@@ -51,6 +47,8 @@ default VertexParallelismStore getVertexParallelismStore() {
5147
interface VertexInformation {
5248
JobVertexID getJobVertexID();
5349

50+
String getVertexName();
51+
5452
int getMinParallelism();
5553

5654
int getParallelism();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.time.Instant;
2424

25-
/** Durable to record enter timestamp and leave timestamp. */
25+
/** Durable to record in timestamp and out timestamp. */
2626
public class Durable {
2727

2828
private final Long inTimestamp;

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java

Lines changed: 128 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,65 @@
5151
import java.util.Set;
5252
import java.util.stream.Collectors;
5353

54-
/** Rescale event. */
54+
/**
55+
* The rescale to record the related vertices and slots change during the rescaling process.
56+
*
57+
* <p>This rescale begins when the scheduler initiates a rescaling operation and ends when the
58+
* rescaling succeeds.
59+
*
60+
* <pre>
61+
*
62+
* The structure of the rescale as follows:
63+
*
64+
* +--> rescale id information:
65+
* + +-->rescale uuid
66+
* + +-->resource requirements id
67+
* + +-->rescale attempt id
68+
* +--> vertices:
69+
* + +--> job vertex id-1 -> vertex-1 parallelism rescale:
70+
* + + +--> vertex id
71+
* + + +--> vertex name
72+
* + + +--> slot sharing group id
73+
* + + +--> slot sharing group name
74+
* + + +--> desired parallelism
75+
* + + +--> sufficient parallelism
76+
* + +--> job vertex id-2 -> vertex-2 parallelism rescale:
77+
* + + +--> ...
78+
* + + ...
79+
* + ...
80+
* +--> slots:
81+
* + +--> slot sharing group id-1 -> slot-1 sharing group rescale:
82+
* + + +--> slot sharing group id
83+
* + + +--> slot sharing group name
84+
* + + +--> required resource profile
85+
* + + +--> minimal required slots
86+
* + + +--> pre-rescale slots
87+
* + + +--> post-rescale slots
88+
* + + +--> acquired resource profile
89+
* + +--> slot sharing group id-2 -> slot-2 sharing group rescale:
90+
* + + +--> ...
91+
* + + ...
92+
* + ...
93+
* +--> scheduler states:
94+
* + +--> scheduler state span:
95+
* + + +--> state
96+
* + + +--> in timestamp
97+
* + + +--> out timestamp
98+
* + + +--> duration
99+
* + + +--> exception information
100+
* + +--> ...
101+
* + ...
102+
* +--> start timestamp
103+
* +--> end timestamp
104+
* +--> trigger cause
105+
* +--> terminal state
106+
* +--> terminated reason
107+
*
108+
* </pre>
109+
*
110+
* <p>The more design details about the rescale could be viewed in <a
111+
* href="https://cwiki.apache.org/confluence/x/TQr0Ew">FLIP-495</a>.
112+
*/
55113
public class Rescale implements Serializable {
56114

57115
private static final long serialVersionUID = 1L;
@@ -167,6 +225,10 @@ public Rescale setStartTimestamp(long timestamp) {
167225
return this;
168226
}
169227

228+
public Long getStartTimestamp() {
229+
return startTimestamp;
230+
}
231+
170232
public Rescale setEndTimestamp(Long endTimestamp) {
171233
if (this.endTimestamp != null) {
172234
LOG.warn("The old endTimestamp was already set to '{}'", this.endTimestamp);
@@ -175,6 +237,10 @@ public Rescale setEndTimestamp(Long endTimestamp) {
175237
return this;
176238
}
177239

240+
public Long getEndTimestamp() {
241+
return endTimestamp;
242+
}
243+
178244
public Rescale setDesiredSlots(JobInformation jobInformation) {
179245
for (SlotSharingGroup sharingGroup : jobInformation.getSlotSharingGroups()) {
180246
int desiredSlot =
@@ -186,10 +252,10 @@ public Rescale setDesiredSlots(JobInformation jobInformation) {
186252
.getParallelism())
187253
.max(Integer::compare)
188254
.orElse(0);
189-
SlotSharingGroupId sharingGroupId = sharingGroup.getSlotSharingGroupId();
190255
SlotSharingGroupRescale sharingGroupRescaleInfo =
191-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
192-
sharingGroupRescaleInfo.setSlotSharingGroupMetaInfo(sharingGroup);
256+
slots.computeIfAbsent(
257+
sharingGroup.getSlotSharingGroupId(),
258+
ignored -> new SlotSharingGroupRescale(sharingGroup));
193259
sharingGroupRescaleInfo.setDesiredSlots(desiredSlot);
194260
}
195261
return this;
@@ -201,15 +267,16 @@ public Rescale setDesiredVertexParallelism(JobInformation jobInformation) {
201267
for (Map.Entry<JobVertexID, VertexParallelismInformation> entry :
202268
allParallelismInfo.entrySet()) {
203269
JobVertexID jvId = entry.getKey();
204-
VertexParallelismInformation vertexParallelInfo = entry.getValue();
205-
VertexParallelismRescale vertexParallelismRescale =
206-
this.vertices.computeIfAbsent(
207-
jvId, jobVertexID -> new VertexParallelismRescale(jvId));
208270
SlotSharingGroup slotSharingGroup =
209271
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
210-
vertexParallelismRescale.setSlotSharingGroupMetaInfo(slotSharingGroup);
211-
vertexParallelismRescale.setJobVertexName(jobInformation.getVertexName(jvId));
212-
vertexParallelismRescale.setRequiredParallelisms(vertexParallelInfo);
272+
String vertexName = jobInformation.getVertexInformation(jvId).getVertexName();
273+
VertexParallelismRescale vertexParallelismRescale =
274+
this.vertices.computeIfAbsent(
275+
jvId,
276+
jobVertexID ->
277+
new VertexParallelismRescale(
278+
jvId, vertexName, slotSharingGroup));
279+
vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
213280
}
214281
return this;
215282
}
@@ -219,43 +286,69 @@ public Rescale setMinimalRequiredSlots(JobInformation jobInformation) {
219286
SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
220287
for (Map.Entry<SlotSharingGroup, SlotSharingGroupMetaInfo> entry :
221288
slotSharingGroupMetaInfo.entrySet()) {
222-
SlotSharingGroupId groupId = entry.getKey().getSlotSharingGroupId();
289+
SlotSharingGroup sharingGroup = entry.getKey();
223290
SlotSharingGroupRescale slotSharingGroupRescale =
224-
slots.computeIfAbsent(groupId, SlotSharingGroupRescale::new);
291+
slots.computeIfAbsent(
292+
sharingGroup.getSlotSharingGroupId(),
293+
ignored -> new SlotSharingGroupRescale(sharingGroup));
225294
slotSharingGroupRescale.setMinimalRequiredSlots(entry.getValue().getMaxLowerBound());
226295
}
227296
return this;
228297
}
229298

230-
public Rescale setPreRescaleSlotsAndParallelisms(@Nullable Rescale lastCompletedRescale) {
299+
public Rescale setPreRescaleSlotsAndParallelisms(
300+
JobInformation jobInformation, @Nullable Rescale lastCompletedRescale) {
231301
if (lastCompletedRescale == null) {
232302
LOG.info("No available previous parallelism to set.");
233303
return this;
234304
}
235305
for (JobVertexID jobVertexID : lastCompletedRescale.getVertices().keySet()) {
236306
Integer preRescaleParallelism =
237307
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
308+
JobInformation.VertexInformation vertexInformation =
309+
jobInformation.getVertexInformation(jobVertexID);
238310
VertexParallelismRescale vertexParallelismRescale =
239-
vertices.computeIfAbsent(jobVertexID, VertexParallelismRescale::new);
311+
vertices.computeIfAbsent(
312+
jobVertexID,
313+
jobVertexId ->
314+
new VertexParallelismRescale(
315+
jobVertexId,
316+
vertexInformation.getVertexName(),
317+
vertexInformation.getSlotSharingGroup()));
240318
vertexParallelismRescale.setPreRescaleParallelism(preRescaleParallelism);
241319
}
242320

243-
for (SlotSharingGroupId sharingGroupId : lastCompletedRescale.getSlots().keySet()) {
244-
Integer preRescaleSlot =
245-
lastCompletedRescale.slots.get(sharingGroupId).getPostRescaleSlots();
321+
Map<SlotSharingGroupId, SlotSharingGroupRescale> slotsRescales =
322+
lastCompletedRescale.getSlots();
323+
for (SlotSharingGroup sharingGroup : jobInformation.getSlotSharingGroups()) {
324+
SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();
325+
Integer preRescaleSlot = slotsRescales.get(slotSharingGroupId).getPostRescaleSlots();
246326
SlotSharingGroupRescale slotSharingGroupRescale =
247-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
327+
slots.computeIfAbsent(
328+
slotSharingGroupId,
329+
ignored -> new SlotSharingGroupRescale(sharingGroup));
248330
slotSharingGroupRescale.setPreRescaleSlots(preRescaleSlot);
249331
}
250332

251333
return this;
252334
}
253335

254-
public Rescale setPostRescaleVertexParallelism(VertexParallelism postRescaleVertexParallelism) {
336+
public Rescale setPostRescaleVertexParallelism(
337+
JobInformation jobInformation, VertexParallelism postRescaleVertexParallelism) {
338+
255339
Set<JobVertexID> vertices = postRescaleVertexParallelism.getVertices();
256340
for (JobVertexID vertexID : vertices) {
341+
JobInformation.VertexInformation vertexInformation =
342+
jobInformation.getVertexInformation(vertexID);
257343
VertexParallelismRescale vertexParallelismRescale =
258-
this.vertices.computeIfAbsent(vertexID, VertexParallelismRescale::new);
344+
this.vertices.computeIfAbsent(
345+
vertexID,
346+
jobVertexId ->
347+
new VertexParallelismRescale(
348+
jobVertexId,
349+
vertexInformation.getVertexName(),
350+
vertexInformation.getSlotSharingGroup()));
351+
259352
vertexParallelismRescale.setPostRescaleParallelism(
260353
postRescaleVertexParallelism.getParallelism(vertexID));
261354
}
@@ -264,27 +357,28 @@ public Rescale setPostRescaleVertexParallelism(VertexParallelism postRescaleVert
264357

265358
public Rescale setPostRescaleSlots(
266359
Collection<JobSchedulingPlan.SlotAssignment> postRescaleSlotAssignments) {
267-
Map<SlotSharingGroupId, Set<JobSchedulingPlan.SlotAssignment>> assignmentsPerSharingGroup =
360+
Map<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>> assignmentsPerSharingGroup =
268361
postRescaleSlotAssignments.stream()
269362
.collect(
270363
Collectors.groupingBy(
271364
slotAssignment ->
272365
slotAssignment
273366
.getTargetAs(
274367
ExecutionSlotSharingGroup.class)
275-
.getSlotSharingGroup()
276-
.getSlotSharingGroupId(),
368+
.getSlotSharingGroup(),
277369
Collectors.toSet()));
278-
for (Map.Entry<SlotSharingGroupId, Set<JobSchedulingPlan.SlotAssignment>> entry :
370+
for (Map.Entry<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>> entry :
279371
assignmentsPerSharingGroup.entrySet()) {
280-
SlotSharingGroupId sharingGroupId = entry.getKey();
372+
SlotSharingGroup sharingGroup = entry.getKey();
281373
Set<JobSchedulingPlan.SlotAssignment> assignments =
282-
assignmentsPerSharingGroup.get(sharingGroupId);
374+
assignmentsPerSharingGroup.get(sharingGroup);
283375
int postRescaleSlot = assignments.size();
284376
ResourceProfile acquiredResource =
285377
assignments.iterator().next().getSlotInfo().getResourceProfile();
286378
SlotSharingGroupRescale slotSharingGroupRescale =
287-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
379+
slots.computeIfAbsent(
380+
sharingGroup.getSlotSharingGroupId(),
381+
ignored -> new SlotSharingGroupRescale(sharingGroup));
288382
slotSharingGroupRescale.setPostRescaleSlots(postRescaleSlot);
289383
slotSharingGroupRescale.setAcquiredResourceProfile(acquiredResource);
290384
}
@@ -296,16 +390,20 @@ public Rescale setTriggerCause(TriggerCause triggerCause) {
296390
return this;
297391
}
298392

393+
public TriggerCause getTriggerCause() {
394+
return triggerCause;
395+
}
396+
299397
public void log() {
300398
LOG.info("Updated rescale is: {}", this);
301399
}
302400

303401
public Map<JobVertexID, VertexParallelismRescale> getVertices() {
304-
return vertices;
402+
return Collections.unmodifiableMap(vertices);
305403
}
306404

307405
public Map<SlotSharingGroupId, SlotSharingGroupRescale> getSlots() {
308-
return slots;
406+
return Collections.unmodifiableMap(slots);
309407
}
310408

311409
public static boolean isTerminated(Rescale rescale) {

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import java.io.Serializable;
2626
import java.util.Objects;
2727

28-
/** Utils class to record the information of a scheduler state. */
28+
/**
29+
* Utils class to record the information of a scheduler state that contains the span of the time of
30+
* the adaptive scheduler state, in timestamp, out timestamp, the exception if occurred during the
31+
* adaptive scheduler state.
32+
*/
2933
public class SchedulerStateSpan implements Serializable {
3034
private static final long serialVersionUID = 1L;
3135

@@ -61,6 +65,20 @@ public void setStringedException(@Nullable String stringedException) {
6165
this.stringedException = stringedException;
6266
}
6367

68+
public String getState() {
69+
return state;
70+
}
71+
72+
@Nullable
73+
public Long getInTimestamp() {
74+
return inTimestamp;
75+
}
76+
77+
@Nullable
78+
public Long getDuration() {
79+
return duration;
80+
}
81+
6482
@Nullable
6583
public String getStringedException() {
6684
return stringedException;

0 commit comments

Comments
 (0)