Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ interface TestWorkflowMutableState {

ExecutionId getExecutionId();

String getFirstExecutionRunId();

WorkflowExecutionStatus getWorkflowExecutionStatus();

StartWorkflowExecutionRequest getStartRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ public ExecutionId getExecutionId() {
return executionId;
}

@Override
public String getFirstExecutionRunId() {
return workflow.getData().firstExecutionRunId;
}

@Override
public WorkflowExecutionStatus getWorkflowExecutionStatus() {
switch (workflow.getState()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser();

private static final String FAILURE_TYPE_STRING = Failure.getDescriptor().getFullName();

private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
// key->WorkflowId
private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
private final Map<WorkflowChainId, TestWorkflowMutableState> executionsByFirstExecutionRunId =
new HashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
private final Lock lock = new ReentrantLock();

Expand Down Expand Up @@ -166,6 +167,52 @@ private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
return getMutableState(executionId, true);
}

private static final class WorkflowChainId {
private final String namespace;
private final String workflowId;
private final String firstExecutionRunId;

private WorkflowChainId(String namespace, String workflowId, String firstExecutionRunId) {
this.namespace = Objects.requireNonNull(namespace);
this.workflowId = Objects.requireNonNull(workflowId);
this.firstExecutionRunId = Objects.requireNonNull(firstExecutionRunId);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkflowChainId that = (WorkflowChainId) o;
return namespace.equals(that.namespace)
&& workflowId.equals(that.workflowId)
&& firstExecutionRunId.equals(that.firstExecutionRunId);
}

@Override
public int hashCode() {
return Objects.hash(namespace, workflowId, firstExecutionRunId);
}

@Override
public String toString() {
return "WorkflowChainId{"
+ "namespace='"
+ namespace
+ '\''
+ ", workflowId='"
+ workflowId
+ '\''
+ ", firstExecutionRunId='"
+ firstExecutionRunId
+ '\''
+ '}';
}
}

private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
lock.lock();
try {
Expand Down Expand Up @@ -205,6 +252,39 @@ private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean
}
}

private TestWorkflowMutableState getMutableState(
WorkflowChainId workflowChainId, boolean failNotExists) {
lock.lock();
try {
TestWorkflowMutableState mutableState = executionsByFirstExecutionRunId.get(workflowChainId);
if (mutableState == null && failNotExists) {
throw Status.NOT_FOUND
.withDescription("Execution not found in mutable state: " + workflowChainId)
.asRuntimeException();
}
return mutableState;
} finally {
lock.unlock();
}
}

private TestWorkflowMutableState getMutableState(
String namespace,
WorkflowExecution execution,
String firstExecutionRunId,
boolean failNotExists) {
ExecutionId executionId = new ExecutionId(namespace, execution);
WorkflowChainId workflowChainId =
firstExecutionRunId.isEmpty()
? null
: new WorkflowChainId(namespace, execution.getWorkflowId(), firstExecutionRunId);

if (workflowChainId != null) {
return getMutableState(workflowChainId, failNotExists);
}
return getMutableState(executionId, failNotExists);
}

@Override
public void startWorkflowExecution(
StartWorkflowExecutionRequest request,
Expand Down Expand Up @@ -454,6 +534,11 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocke
WorkflowExecution execution = mutableState.getExecutionId().getExecution();
ExecutionId executionId = new ExecutionId(namespace, execution);
executionsByWorkflowId.put(workflowId, mutableState);
if (!firstExecutionRunId.isEmpty()) {
executionsByFirstExecutionRunId.put(
new WorkflowChainId(namespace, workflowId.getWorkflowId(), firstExecutionRunId),
mutableState);
}
executions.put(executionId, mutableState);

PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
Expand Down Expand Up @@ -1094,9 +1179,12 @@ public void requestCancelWorkflowExecution(
void requestCancelWorkflowExecution(
RequestCancelWorkflowExecutionRequest cancelRequest,
Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
ExecutionId executionId =
new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
TestWorkflowMutableState mutableState = getMutableState(executionId);
TestWorkflowMutableState mutableState =
getMutableState(
cancelRequest.getNamespace(),
cancelRequest.getWorkflowExecution(),
cancelRequest.getFirstExecutionRunId(),
true);
mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
}

Expand All @@ -1114,9 +1202,12 @@ public void terminateWorkflowExecution(
}

private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
ExecutionId executionId =
new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
TestWorkflowMutableState mutableState = getMutableState(executionId);
TestWorkflowMutableState mutableState =
getMutableState(
request.getNamespace(),
request.getWorkflowExecution(),
request.getFirstExecutionRunId(),
true);
mutableState.terminateWorkflowExecution(request);
}

Expand Down
Loading
Loading