Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -41,6 +42,8 @@ public class SlotSharingGroup implements java.io.Serializable {

private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();

private String slotSharingGroupName;

// Represents resources of all tasks in the group. Default to be UNKNOWN.
private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;

Expand Down Expand Up @@ -70,12 +73,35 @@ public ResourceProfile getResourceProfile() {
return resourceProfile;
}

public String getSlotSharingGroupName() {
return slotSharingGroupName;
}

public void setSlotSharingGroupName(String slotSharingGroupName) {
this.slotSharingGroupName = slotSharingGroupName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the slotSharingGroupName; to the toString(). We should have a hashcode for this object in including slotSharingGroupName.

}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

@Override
public int hashCode() {
return Objects.hash(ids, slotSharingGroupId, slotSharingGroupName, resourceProfile);
}

@Override
public String toString() {
return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + resourceProfile + '}';
return "SlotSharingGroup{"
+ "ids="
+ ids
+ ", slotSharingGroupId="
+ slotSharingGroupId
+ ", slotSharingGroupName='"
+ slotSharingGroupName
+ '\''
+ ", resourceProfile="
+ resourceProfile
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;

import org.slf4j.Logger;

Expand All @@ -34,13 +35,21 @@ class Finished implements State {

private final Logger logger;

private final Durable durable;

Finished(Context context, ArchivedExecutionGraph archivedExecutionGraph, Logger logger) {
this.archivedExecutionGraph = archivedExecutionGraph;
this.logger = logger;
this.durable = new Durable();

context.onFinished(archivedExecutionGraph);
}

@Override
public Durable getDurable() {
return durable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just have:

public Durable getDurable() {
        return new Durable();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @davidradl for your comments.

Please let me have a try on clarifying it:
In the current scheduler’s terminal state, this duration is used to represent the start and end time of the scheduler state.
Therefore, generally speaking, it is reasonable to record the start time at the moment when the state is created, that is, when creating the Durable object.

}

@Override
public void cancel() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand All @@ -38,7 +39,7 @@ public class JobGraphJobInformation implements JobInformation {
private final JobGraph jobGraph;
private final JobID jobID;
private final String name;
private final VertexParallelismStore vertexParallelismStore;
protected final VertexParallelismStore vertexParallelismStore;

public JobGraphJobInformation(
JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
Expand Down Expand Up @@ -83,17 +84,19 @@ public JobGraph copyJobGraph() {
return InstantiationUtil.cloneUnchecked(jobGraph);
}

@Override
public VertexParallelismStore getVertexParallelismStore() {
return vertexParallelismStore;
}

private static final class JobVertexInformation implements JobInformation.VertexInformation {
@VisibleForTesting
public static final class JobVertexInformation implements JobInformation.VertexInformation {

private final JobVertex jobVertex;

private final VertexParallelismInformation parallelismInfo;

private JobVertexInformation(
public JobVertexInformation(
JobVertex jobVertex, VertexParallelismInformation parallelismInfo) {
this.jobVertex = jobVertex;
this.parallelismInfo = parallelismInfo;
Expand All @@ -104,6 +107,11 @@ public JobVertexID getJobVertexID() {
return jobVertex.getID();
}

@Override
public String getVertexName() {
return jobVertex.getName();
}

@Override
public int getMinParallelism() {
return parallelismInfo.getMinParallelism();
Expand All @@ -123,5 +131,10 @@ public int getMaxParallelism() {
public SlotSharingGroup getSlotSharingGroup() {
return jobVertex.getSlotSharingGroup();
}

@VisibleForTesting
public VertexParallelismInformation getVertexParallelismInfo() {
return parallelismInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,37 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import org.slf4j.Logger;

import java.time.Instant;
import java.util.Optional;
import java.util.function.Consumer;

/**
* State abstraction of the {@link AdaptiveScheduler}. This interface contains all methods every
* state implementation must support.
*/
interface State extends LabeledGlobalFailureHandler {
public interface State extends LabeledGlobalFailureHandler {

/**
* Get the durable time information of the current state.
*
* @return The durable time information of the current state.
*/
Durable getDurable();

/**
* This method is called whenever one transitions out of this state.
*
* @param newState newState is the state into which the scheduler transitions
*/
default void onLeave(Class<? extends State> newState) {}
default void onLeave(Class<? extends State> newState) {
getDurable().setOutTimestamp(Instant.now().toEpochMilli());
}

/** Cancels the job execution. */
void cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
Expand All @@ -69,6 +70,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,6 +103,8 @@ abstract class StateWithExecutionGraph implements State {

private final VertexEndOfDataListener vertexEndOfDataListener;

private final Durable durable;

StateWithExecutionGraph(
Context context,
ExecutionGraph executionGraph,
Expand All @@ -118,6 +122,7 @@ abstract class StateWithExecutionGraph implements State {
this.userCodeClassLoader = userClassCodeLoader;
this.failureCollection = new ArrayList<>(failureCollection);
this.vertexEndOfDataListener = new VertexEndOfDataListener(executionGraph);
this.durable = new Durable();

FutureUtils.assertNoException(
executionGraph
Expand All @@ -137,6 +142,11 @@ abstract class StateWithExecutionGraph implements State {
context.getMainThreadExecutor()));
}

@Override
public Durable getDurable() {
return durable;
}

ExecutionGraph getExecutionGraph() {
return executionGraph;
}
Expand All @@ -156,6 +166,7 @@ protected ExecutionGraphHandler getExecutionGraphHandler() {

@Override
public void onLeave(Class<? extends State> newState) {
getDurable().setOutTimestamp(Instant.now().toEpochMilli());
if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
// we are leaving the StateWithExecutionGraph --> we need to dispose temporary services
operatorCoordinatorHandler.disposeAllOperatorCoordinators();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;

import org.slf4j.Logger;

Expand All @@ -39,9 +40,17 @@ abstract class StateWithoutExecutionGraph implements State {

private final Logger logger;

private final Durable durable;

StateWithoutExecutionGraph(Context context, Logger logger) {
this.context = context;
this.logger = logger;
this.durable = new Durable();
}

@Override
public Durable getDurable() {
return durable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.scheduler.adaptive.allocator;

import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
Expand All @@ -38,13 +37,13 @@ class AllocatorUtil {

private AllocatorUtil() {}

static Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
static Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
return SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
}

static int getMinimumRequiredSlots(
Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
Map<SlotSharingGroup, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
slotSharingGroupMetaInfos) {
return slotSharingGroupMetaInfos.values().stream()
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
Expand Down Expand Up @@ -78,7 +77,10 @@ static void checkMinimumRequiredSlots(
}
});
return sharedSlotToVertexAssignment.values().stream()
.map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
.map(
ids ->
new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
slotSharingGroup, ids))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;

import java.util.Collection;

Expand All @@ -38,10 +39,16 @@ public interface JobInformation {

Iterable<VertexInformation> getVertices();

default VertexParallelismStore getVertexParallelismStore() {
throw new UnsupportedOperationException();
}

/** Information about a single vertex. */
interface VertexInformation {
JobVertexID getJobVertexID();

String getVertexName();

int getMinParallelism();

int getParallelism();
Expand Down
Loading