Skip to content

Commit 4184432

Browse files
committed
Address the david's comments.
1 parent 5aff6bd commit 4184432

File tree

14 files changed

+253
-119
lines changed

14 files changed

+253
-119
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
2424

2525
import java.util.Collections;
26+
import java.util.Objects;
2627
import java.util.Set;
2728
import java.util.TreeSet;
2829

@@ -84,8 +85,23 @@ public void setSlotSharingGroupName(String slotSharingGroupName) {
8485
// Utilities
8586
// ------------------------------------------------------------------------
8687

88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(ids, slotSharingGroupId, slotSharingGroupName, resourceProfile);
91+
}
92+
8793
@Override
8894
public String toString() {
89-
return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + resourceProfile + '}';
95+
return "SlotSharingGroup{"
96+
+ "ids="
97+
+ ids
98+
+ ", slotSharingGroupId="
99+
+ slotSharingGroupId
100+
+ ", slotSharingGroupName='"
101+
+ slotSharingGroupName
102+
+ '\''
103+
+ ", resourceProfile="
104+
+ resourceProfile
105+
+ '}';
90106
}
91107
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@ public Iterable<JobInformation.VertexInformation> getVertices() {
7979
jobGraph.getVertices(), (vertex) -> getVertexInformation(vertex.getID()));
8080
}
8181

82-
@Override
83-
public String getVertexName(JobVertexID jobVertexID) {
84-
return jobGraph.findVertexByID(jobVertexID).getName();
85-
}
86-
8782
/** Returns a copy of a jobGraph that can be mutated. */
8883
public JobGraph copyJobGraph() {
8984
return InstantiationUtil.cloneUnchecked(jobGraph);
@@ -112,6 +107,11 @@ public JobVertexID getJobVertexID() {
112107
return jobVertex.getID();
113108
}
114109

110+
@Override
111+
public String getVertexName() {
112+
return jobVertex.getName();
113+
}
114+
115115
@Override
116116
public int getMinParallelism() {
117117
return parallelismInfo.getMinParallelism();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ public interface JobInformation {
3939

4040
Iterable<VertexInformation> getVertices();
4141

42-
default String getVertexName(JobVertexID jobVertexID) {
43-
throw new UnsupportedOperationException();
44-
}
45-
4642
default VertexParallelismStore getVertexParallelismStore() {
4743
throw new UnsupportedOperationException();
4844
}
@@ -51,6 +47,8 @@ default VertexParallelismStore getVertexParallelismStore() {
5147
interface VertexInformation {
5248
JobVertexID getJobVertexID();
5349

50+
String getVertexName();
51+
5452
int getMinParallelism();
5553

5654
int getParallelism();

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Durable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.time.Instant;
2424

25-
/** Durable to record enter timestamp and leave timestamp. */
25+
/** Durable to record in timestamp and out timestamp. */
2626
public class Durable {
2727

2828
private final Long inTimestamp;

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java

Lines changed: 111 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,62 @@
5151
import java.util.Set;
5252
import java.util.stream.Collectors;
5353

54-
/** Rescale event. */
54+
/**
55+
* The rescale to record the related vertices and slots change during the rescaling process.
56+
*
57+
* <p>This rescale begins when the scheduler initiates a rescaling operation and ends when the
58+
* rescaling succeeds.
59+
*
60+
* <pre>
61+
* The structure of the rescale as follows:
62+
* - rescale id information:
63+
* +--> rescale uuid
64+
* +--> resource requirements id
65+
* +--> rescale attempt id
66+
* +--> vertices:
67+
* + +--> job vertex id-1 -> vertex-1 parallelism rescale:
68+
* + + +--> vertex id
69+
* + + +--> vertex name
70+
* + + +--> slot sharing group id
71+
* + + +--> slot sharing group name
72+
* + + +--> desired parallelism
73+
* + + +--> sufficient parallelism
74+
* + +--> job vertex id-2 -> vertex-2 parallelism rescale:
75+
* + + +--> ...
76+
* + + ...
77+
* + ...
78+
* +--> slots:
79+
* + +--> slot sharing group id-1 -> slot-1 sharing group rescale:
80+
* + + +--> slot sharing group id
81+
* + + +--> slot sharing group name
82+
* + + +--> required resource profile
83+
* + + +--> minimal required slots
84+
* + + +--> pre-rescale slots
85+
* + + +--> post-rescale slots
86+
* + + +--> acquired resource profile
87+
* + +--> slot sharing group id-2 -> slot-2 sharing group rescale:
88+
* + + +--> ...
89+
* + + ...
90+
* + ...
91+
* +--> scheduler states:
92+
* + +--> scheduler state span:
93+
* + + +--> state
94+
* + + +--> in timestamp
95+
* + + +--> out timestamp
96+
* + + +--> duration
97+
* + + +--> exception information
98+
* + +--> ...
99+
* + ...
100+
* +--> start timestamp
101+
* +--> end timestamp
102+
* +--> trigger cause
103+
* +--> terminal state
104+
* +--> terminated reason
105+
* </pre>
106+
*
107+
* <p>The more design details about the rescale could be viewed in <a
108+
* href="https://cwiki.apache.org/confluence/x/TQr0Ew">FLIP-495</a>.
109+
*/
55110
public class Rescale implements Serializable {
56111

57112
private static final long serialVersionUID = 1L;
@@ -186,10 +241,10 @@ public Rescale setDesiredSlots(JobInformation jobInformation) {
186241
.getParallelism())
187242
.max(Integer::compare)
188243
.orElse(0);
189-
SlotSharingGroupId sharingGroupId = sharingGroup.getSlotSharingGroupId();
190244
SlotSharingGroupRescale sharingGroupRescaleInfo =
191-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
192-
sharingGroupRescaleInfo.setSlotSharingGroupMetaInfo(sharingGroup);
245+
slots.computeIfAbsent(
246+
sharingGroup.getSlotSharingGroupId(),
247+
ignored -> new SlotSharingGroupRescale(sharingGroup));
193248
sharingGroupRescaleInfo.setDesiredSlots(desiredSlot);
194249
}
195250
return this;
@@ -201,15 +256,16 @@ public Rescale setDesiredVertexParallelism(JobInformation jobInformation) {
201256
for (Map.Entry<JobVertexID, VertexParallelismInformation> entry :
202257
allParallelismInfo.entrySet()) {
203258
JobVertexID jvId = entry.getKey();
204-
VertexParallelismInformation vertexParallelInfo = entry.getValue();
205-
VertexParallelismRescale vertexParallelismRescale =
206-
this.vertices.computeIfAbsent(
207-
jvId, jobVertexID -> new VertexParallelismRescale(jvId));
208259
SlotSharingGroup slotSharingGroup =
209260
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
210-
vertexParallelismRescale.setSlotSharingGroupMetaInfo(slotSharingGroup);
211-
vertexParallelismRescale.setJobVertexName(jobInformation.getVertexName(jvId));
212-
vertexParallelismRescale.setRequiredParallelisms(vertexParallelInfo);
261+
String vertexName = jobInformation.getVertexInformation(jvId).getVertexName();
262+
VertexParallelismRescale vertexParallelismRescale =
263+
this.vertices.computeIfAbsent(
264+
jvId,
265+
jobVertexID ->
266+
new VertexParallelismRescale(
267+
jvId, vertexName, slotSharingGroup));
268+
vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
213269
}
214270
return this;
215271
}
@@ -219,43 +275,69 @@ public Rescale setMinimalRequiredSlots(JobInformation jobInformation) {
219275
SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
220276
for (Map.Entry<SlotSharingGroup, SlotSharingGroupMetaInfo> entry :
221277
slotSharingGroupMetaInfo.entrySet()) {
222-
SlotSharingGroupId groupId = entry.getKey().getSlotSharingGroupId();
278+
SlotSharingGroup sharingGroup = entry.getKey();
223279
SlotSharingGroupRescale slotSharingGroupRescale =
224-
slots.computeIfAbsent(groupId, SlotSharingGroupRescale::new);
280+
slots.computeIfAbsent(
281+
sharingGroup.getSlotSharingGroupId(),
282+
ignored -> new SlotSharingGroupRescale(sharingGroup));
225283
slotSharingGroupRescale.setMinimalRequiredSlots(entry.getValue().getMaxLowerBound());
226284
}
227285
return this;
228286
}
229287

230-
public Rescale setPreRescaleSlotsAndParallelisms(@Nullable Rescale lastCompletedRescale) {
288+
public Rescale setPreRescaleSlotsAndParallelisms(
289+
JobInformation jobInformation, @Nullable Rescale lastCompletedRescale) {
231290
if (lastCompletedRescale == null) {
232291
LOG.info("No available previous parallelism to set.");
233292
return this;
234293
}
235294
for (JobVertexID jobVertexID : lastCompletedRescale.getVertices().keySet()) {
236295
Integer preRescaleParallelism =
237296
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
297+
JobInformation.VertexInformation vertexInformation =
298+
jobInformation.getVertexInformation(jobVertexID);
238299
VertexParallelismRescale vertexParallelismRescale =
239-
vertices.computeIfAbsent(jobVertexID, VertexParallelismRescale::new);
300+
vertices.computeIfAbsent(
301+
jobVertexID,
302+
jobVertexId ->
303+
new VertexParallelismRescale(
304+
jobVertexId,
305+
vertexInformation.getVertexName(),
306+
vertexInformation.getSlotSharingGroup()));
240307
vertexParallelismRescale.setPreRescaleParallelism(preRescaleParallelism);
241308
}
242309

243-
for (SlotSharingGroupId sharingGroupId : lastCompletedRescale.getSlots().keySet()) {
244-
Integer preRescaleSlot =
245-
lastCompletedRescale.slots.get(sharingGroupId).getPostRescaleSlots();
310+
Map<SlotSharingGroupId, SlotSharingGroupRescale> slotsRescales =
311+
lastCompletedRescale.getSlots();
312+
for (SlotSharingGroup sharingGroup : jobInformation.getSlotSharingGroups()) {
313+
SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();
314+
Integer preRescaleSlot = slotsRescales.get(slotSharingGroupId).getPostRescaleSlots();
246315
SlotSharingGroupRescale slotSharingGroupRescale =
247-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
316+
slots.computeIfAbsent(
317+
slotSharingGroupId,
318+
ignored -> new SlotSharingGroupRescale(sharingGroup));
248319
slotSharingGroupRescale.setPreRescaleSlots(preRescaleSlot);
249320
}
250321

251322
return this;
252323
}
253324

254-
public Rescale setPostRescaleVertexParallelism(VertexParallelism postRescaleVertexParallelism) {
325+
public Rescale setPostRescaleVertexParallelism(
326+
JobInformation jobInformation, VertexParallelism postRescaleVertexParallelism) {
327+
255328
Set<JobVertexID> vertices = postRescaleVertexParallelism.getVertices();
256329
for (JobVertexID vertexID : vertices) {
330+
JobInformation.VertexInformation vertexInformation =
331+
jobInformation.getVertexInformation(vertexID);
257332
VertexParallelismRescale vertexParallelismRescale =
258-
this.vertices.computeIfAbsent(vertexID, VertexParallelismRescale::new);
333+
this.vertices.computeIfAbsent(
334+
vertexID,
335+
jobVertexId ->
336+
new VertexParallelismRescale(
337+
jobVertexId,
338+
vertexInformation.getVertexName(),
339+
vertexInformation.getSlotSharingGroup()));
340+
259341
vertexParallelismRescale.setPostRescaleParallelism(
260342
postRescaleVertexParallelism.getParallelism(vertexID));
261343
}
@@ -264,27 +346,28 @@ public Rescale setPostRescaleVertexParallelism(VertexParallelism postRescaleVert
264346

265347
public Rescale setPostRescaleSlots(
266348
Collection<JobSchedulingPlan.SlotAssignment> postRescaleSlotAssignments) {
267-
Map<SlotSharingGroupId, Set<JobSchedulingPlan.SlotAssignment>> assignmentsPerSharingGroup =
349+
Map<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>> assignmentsPerSharingGroup =
268350
postRescaleSlotAssignments.stream()
269351
.collect(
270352
Collectors.groupingBy(
271353
slotAssignment ->
272354
slotAssignment
273355
.getTargetAs(
274356
ExecutionSlotSharingGroup.class)
275-
.getSlotSharingGroup()
276-
.getSlotSharingGroupId(),
357+
.getSlotSharingGroup(),
277358
Collectors.toSet()));
278-
for (Map.Entry<SlotSharingGroupId, Set<JobSchedulingPlan.SlotAssignment>> entry :
359+
for (Map.Entry<SlotSharingGroup, Set<JobSchedulingPlan.SlotAssignment>> entry :
279360
assignmentsPerSharingGroup.entrySet()) {
280-
SlotSharingGroupId sharingGroupId = entry.getKey();
361+
SlotSharingGroup sharingGroup = entry.getKey();
281362
Set<JobSchedulingPlan.SlotAssignment> assignments =
282-
assignmentsPerSharingGroup.get(sharingGroupId);
363+
assignmentsPerSharingGroup.get(sharingGroup);
283364
int postRescaleSlot = assignments.size();
284365
ResourceProfile acquiredResource =
285366
assignments.iterator().next().getSlotInfo().getResourceProfile();
286367
SlotSharingGroupRescale slotSharingGroupRescale =
287-
slots.computeIfAbsent(sharingGroupId, SlotSharingGroupRescale::new);
368+
slots.computeIfAbsent(
369+
sharingGroup.getSlotSharingGroupId(),
370+
ignored -> new SlotSharingGroupRescale(sharingGroup));
288371
slotSharingGroupRescale.setPostRescaleSlots(postRescaleSlot);
289372
slotSharingGroupRescale.setAcquiredResourceProfile(acquiredResource);
290373
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import java.io.Serializable;
2626
import java.util.Objects;
2727

28-
/** Utils class to record the information of a scheduler state. */
28+
/**
29+
* Utils class to record the information of a scheduler state that contains the span of the time of
30+
* the adaptive scheduler state, in timestamp, out timestamp, the exception if occurred during the
31+
* adaptive scheduler state.
32+
*/
2933
public class SchedulerStateSpan implements Serializable {
3034
private static final long serialVersionUID = 1L;
3135

0 commit comments

Comments
 (0)