diff --git a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/runtime/DAGInteraction.java b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/runtime/DAGInteraction.java index ed2170a8a..cf42cc5c3 100644 --- a/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/runtime/DAGInteraction.java +++ b/rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/runtime/DAGInteraction.java @@ -32,7 +32,7 @@ public interface DAGInteraction { /** * 提交需执行的DAG任务 */ - void submit(String executionId, DAG dag, Map data, DAGSettings settings, NotifyInfo notifyInfo); + void submit(String executionId, String taskName, DAG dag, Map data, DAGSettings settings, NotifyInfo notifyInfo); /** * 完成task后调用接口 diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index 81c43f237..c5be80763 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -47,11 +47,10 @@ import java.util.*; import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; import java.util.function.Supplier; @Slf4j -public class DAGOperations { +public class DAGOperations implements DAGOperationsInterface { private static final String EXECUTION_ID = "executionId"; private final ExecutorService runnerExecutor; @@ -63,20 +62,6 @@ public class DAGOperations { private final DAGResultHandler dagResultHandler; - public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { - int exceptionCatchTimes = retryTimes; - for (int i = 1; i <= exceptionCatchTimes; i++) { - try { - operation.run(); - return; - } catch (Exception e) { - log.warn("operateWithRetry fails, invokeTimes:{}", i, e); - } - } - - operation.run(); - }; - public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner, TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback, DAGResultHandler dagResultHandler) { @@ -231,9 +216,9 @@ public void redoTask(String executionId, List taskNames, Map data, NotifyInfo notifyInfo) { + public void submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map data, NotifyInfo notifyInfo) { log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo); - ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo); + ExecutionResult executionResult = dagRunner.submitDAG(executionId, taskName, dag, settings, data, notifyInfo); Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline())) .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds)); dagTraversal.submitTraversal(executionId, null); diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java new file mode 100644 index 000000000..e4ea2c464 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java @@ -0,0 +1,18 @@ +package com.weibo.rill.flow.olympicene.traversal; + +import com.weibo.rill.flow.interfaces.model.task.TaskInfo; +import com.weibo.rill.flow.olympicene.core.model.NotifyInfo; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGInvokeMsg; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.Collection; +import java.util.Map; + +public interface DAGOperationsInterface { + void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg); + void finishTaskAsync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map output); + void finishTaskSync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map output); + void runTasks(String executionId, Collection>> taskInfoToContexts); +} diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java index 73b16e4db..df2c6bdb7 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java @@ -34,6 +34,7 @@ import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper; import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper; import com.weibo.rill.flow.olympicene.traversal.helper.Stasher; +import com.weibo.rill.flow.olympicene.traversal.utils.OperationUtil; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -56,7 +57,7 @@ public class DAGTraversal { private final DAGStorageProcedure dagStorageProcedure; private final ExecutorService traversalExecutor; @Setter - private DAGOperations dagOperations; + private DAGOperationsInterface dagOperations; @Setter private Stasher stasher; @@ -80,7 +81,7 @@ public void submitTraversal(String executionId, String completedTaskName) { Runnable basicActions = () -> dagStorageProcedure.lockAndRun( LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName)); Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS); - DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + OperationUtil.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); } catch (Exception e) { log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e); } @@ -99,7 +100,7 @@ public void submitTasks(String executionId, Set taskInfos, Map data) { - submit(executionId, dag, data, DAGSettings.DEFAULT, null); + submit(executionId, null, dag, data, DAGSettings.DEFAULT, null); } /** * 提交需执行的DAG任务 */ @Override - public void submit(String executionId, DAG dag, Map data, DAGSettings settings, NotifyInfo notifyInfo) { + public void submit(String executionId, String taskName, DAG dag, Map data, DAGSettings settings, NotifyInfo notifyInfo) { runNotify(executionId, NotifyType.SUBMIT, notifyInfo, - () -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo)); + () -> dagOperations.submitDAG(executionId, taskName, dag, settings, data, notifyInfo)); } public void runNotify(String executionId, NotifyType notifyType, NotifyInfo notifyInfo, Runnable actions) { @@ -186,7 +186,7 @@ public DAGResult run(String executionId, DAG dag, Map data, DAGS dagResultHandler.initEnv(executionId); doRunNotify(executionId, NotifyType.RUN, notifyInfo, - () -> dagOperations.submitDAG(executionId, dag, settings, data, notifyInfo)); + () -> dagOperations.submitDAG(executionId, null, dag, settings, data, notifyInfo)); return dagResultHandler.getDAGResult(executionId, timeoutInMillisecond); } } diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java index 32d5089cd..2edf3e303 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.java @@ -67,8 +67,8 @@ public DAGRunner(DAGContextStorage dagContextStorage, DAGInfoStorage dagInfoStor this.dagStorageProcedure = dagStorageProcedure; } - public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settings, Map data, NotifyInfo notifyInfo) { - ExecutionResult ret = ExecutionResult.builder().build(); + public ExecutionResult submitDAG(String executionId, String taskName, DAG dag, DAGSettings settings, Map data, NotifyInfo notifyInfo) { + ExecutionResult ret = ExecutionResult.builder().needRetry(false).retryIntervalInSeconds(0).build(); dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> { DAGInfo currentExecutionIdDagInfo = dagInfoStorage.getBasicDAGInfo(executionId); @@ -104,6 +104,15 @@ public ExecutionResult submitDAG(String executionId, DAG dag, DAGSettings settin .dagInvokeMsg(dagInvokeMsg) .dagStatus(DAGStatus.RUNNING) .make(); + + if (MapUtils.isNotEmpty(dagInfoToUpdate.getTasks()) && taskName != null) { + for (Map.Entry taskInfoEntry : dagInfoToUpdate.getTasks().entrySet()) { + if (!taskInfoEntry.getKey().equals(taskName)) { + taskInfoEntry.getValue().setTaskStatus(TaskStatus.SKIPPED); + } + } + } + ret.setDagInfo(dagInfoToUpdate); Optional.ofNullable(dagInvokeMsg) .map(DAGInvokeMsg::getExecutionRoutes) @@ -259,7 +268,7 @@ public ExecutionResult finishDAG(String executionId, DAGInfo dagInfo, DAGStatus } log.info("finishDAG finish, executionId:{}", executionId); - return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).build(); + return ExecutionResult.builder().dagInfo(wholeDagInfo).context(context).needRetry(false).retryIntervalInSeconds(0).build(); } private void updateDAGInvokeStartTime(DAGInfo dagInfo) { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java index 466ed5e0d..ac4ce716b 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/TimeCheckRunner.java @@ -32,10 +32,12 @@ import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage; import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure; import com.weibo.rill.flow.olympicene.traversal.DAGOperations; +import com.weibo.rill.flow.olympicene.traversal.DAGOperationsInterface; import com.weibo.rill.flow.olympicene.traversal.checker.TimeCheckMember; import com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker; import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper; import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer; +import com.weibo.rill.flow.olympicene.traversal.utils.OperationUtil; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -54,7 +56,7 @@ public class TimeCheckRunner { private final DAGInfoStorage dagInfoStorage; private final DAGContextStorage dagContextStorage; @Setter - private DAGOperations dagOperations; + private DAGOperationsInterface dagOperations; public TimeCheckRunner(TimeChecker timeChecker, DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure) { @@ -94,7 +96,7 @@ public void handleTimeCheck(String timeCheckMember) { Map context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo); dagOperations.runTasks(executionId, Lists.newArrayList(Pair.of(taskInfo, context))); }; - DAGOperations.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes()); + OperationUtil.OPERATE_WITH_RETRY.accept(operations, SystemConfig.getTimerRetryTimes()); break; default: log.warn("handleTimeCheck time check type nonsupport, type:{}", type); diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java new file mode 100644 index 000000000..69e9d3911 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtil.java @@ -0,0 +1,23 @@ +package com.weibo.rill.flow.olympicene.traversal.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.function.ObjIntConsumer; + +@Slf4j +public class OperationUtil { + private OperationUtil() {} + + public static final ObjIntConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { + for (int i = 1; i <= retryTimes; i++) { + try { + operation.run(); + return; + } catch (Exception e) { + log.warn("operateWithRetry fails, invokeTimes:{}", i, e); + } + } + + operation.run(); + }; +} diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy index 2b56121de..9307a7a61 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy @@ -83,7 +83,7 @@ class InvokeMsgTest extends Specification { .parentDAGTaskInfoName("A") .parentDAGTaskExecutionType(FunctionPattern.FLOW_SYNC) .build() - olympicene.submit("smallFlow", smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit) + olympicene.submit("smallFlow", null, smallFlow, [:], DAGSettings.DEFAULT, smallFlowSubmit) NotifyInfo smallFlowNotify = NotifyInfo.builder() .taskInfoName("B_0-C") @@ -130,4 +130,35 @@ class InvokeMsgTest extends Specification { ((DAGCallbackInfo) event.getData()).getDagInfo().getTask("A").getTaskInvokeMsg().getOutput() == ['flow_root_execution_id':'bigFlow', 'segments':['gopUrl']] }) } + + def "test submit for debug"() { + given: + String flowYaml = "workspace: default\n" + + "dagName: testSubmit\n" + + "alias: release\n" + + "type: flow\n" + + "inputSchema: '[]'\n" + + "tasks:\n" + + " - next: pass1\n" + + " name: pass0\n" + + " category: pass\n" + + " - next: pass2\n" + + " name: pass1\n" + + " category: pass\n" + + " - name: pass2\n" + + " category: pass\n" + DAG testFlow = dagParser.parse(flowYaml) + dispatcher.dispatch(*_) >> '{"execution_id":"testFlow"}' + + when: + olympicene.submit("testFlow", "pass1", testFlow, [:], DAGSettings.DEFAULT, null) + + then: + 1 * callback.onEvent({Event event -> + event.eventCode == DAGEvent.DAG_SUCCEED.getCode() && + ((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass0").getTaskStatus() == TaskStatus.SKIPPED && + ((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass1").getTaskStatus() == TaskStatus.SUCCEED && + ((DAGCallbackInfo) event.getData()).getDagInfo().getTasks().get("pass2").getTaskStatus() == TaskStatus.SKIPPED + }) + } } diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy index cf4bd841b..3537aba7c 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/MultiDAGTest.groovy @@ -66,7 +66,7 @@ class MultiDAGTest extends Specification { NotifyInfo notifyInfo = NotifyInfo.builder() .parentDAGExecutionId("level1") .parentDAGTaskInfoName("A").build() - olympicene.submit("level2", dag, [:], DAGSettings.DEFAULT, notifyInfo) + olympicene.submit("level2", null, dag, [:], DAGSettings.DEFAULT, notifyInfo) TaskInfo level1TaskA = dagStorage.getDAGInfo("level1").getTask("A") DAGInfo level2 = dagStorage.getDAGInfo("level2") @@ -87,12 +87,12 @@ class MultiDAGTest extends Specification { NotifyInfo notifyInfoLevel2 = NotifyInfo.builder() .parentDAGExecutionId("level1") .parentDAGTaskInfoName("A").build() - olympicene.submit("level2", dag, [:], dagSettings, notifyInfoLevel2) + olympicene.submit("level2", null, dag, [:], dagSettings, notifyInfoLevel2) NotifyInfo notifyInfoLevel3 = NotifyInfo.builder() .parentDAGExecutionId("level2") .parentDAGTaskInfoName("A").build() - olympicene.submit("level3", dag, [:], dagSettings, notifyInfoLevel3) + olympicene.submit("level3", null, dag, [:], dagSettings, notifyInfoLevel3) then: def e = thrown(DAGTraversalException) diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtilTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtilTest.groovy new file mode 100644 index 000000000..3cb933fe3 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/utils/OperationUtilTest.groovy @@ -0,0 +1,36 @@ +package com.weibo.rill.flow.olympicene.traversal.utils + +import spock.lang.Specification +import java.util.concurrent.atomic.AtomicInteger + +class OperationUtilTest extends Specification { + def "test OPERATE_WITH_RETRY with retries"() { + given: + AtomicInteger counter = new AtomicInteger(0) + Runnable operation = { + if (counter.incrementAndGet() < 3) { + throw new RuntimeException("Operation failed") + } + } + + when: + OperationUtil.OPERATE_WITH_RETRY.accept(operation, 3) + + then: + counter.get() == 3 + } + + def "test OPERATE_WITH_RETRY with zero retries"() { + given: + AtomicInteger counter = new AtomicInteger(0) + Runnable operation = { + counter.incrementAndGet() + } + + when: + OperationUtil.OPERATE_WITH_RETRY.accept(operation, 0) + + then: + counter.get() == 1 + } +} diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java index 70ac68664..c26cd9bde 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/dispatcher/FlowProtocolDispatcher.java @@ -73,7 +73,7 @@ public String handle(Resource resource, DispatchInfo dispatchInfo) { DAGSettings dagSettings = DAGSettings.builder() .ignoreExist(false) .dagMaxDepth(bizDConfs.getFlowDAGMaxDepth()).build(); - olympicene.submit(executionId, dag, data, dagSettings, notifyInfo); + olympicene.submit(executionId, null, dag, data, dagSettings, notifyInfo); dagResourceStatistic.updateFlowTypeResourceStatus(parentDAGExecutionId, parentTaskName, resource.getResourceName(), dag); ProfileActions.recordTinyDAGSubmit(executionId); // 记录prometheus diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java index ff3c69eab..611af12c9 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/OlympiceneFacade.java @@ -104,7 +104,7 @@ public Map submit(Long uid, String descriptorId, String callback String businessId = DescriptorIdUtil.changeDescriptorIdToBusinessId(descriptorId); Map context = dagContextInitializer.newSubmitContextBuilder(businessId).withData(data).withIdentity(descriptorId).build(); - return submit(uid, descriptorId, context, callback, resourceCheckConfig); + return submit(uid, descriptorId, null, context, callback, resourceCheckConfig); }; return profileRecordService.runNotifyAndRecordProfile(url, descriptorId, submitActions); @@ -112,11 +112,16 @@ public Map submit(Long uid, String descriptorId, String callback public Map submit(User flowUser, String descriptorId, Map context, String callback, ResourceCheckConfig resourceCheckConfig) { - return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, context, callback, resourceCheckConfig); + return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, null, context, callback, resourceCheckConfig); } - public Map submit(Long uid, String descriptorId, Map context, String callback, ResourceCheckConfig resourceCheckConfig) { + public Map submit(User flowUser, String descriptorId, Map context, String callback, ResourceCheckConfig resourceCheckConfig, String taskName) { + return submit(Optional.ofNullable(flowUser).map(User::getUid).orElse(0L), descriptorId, taskName, context, callback, resourceCheckConfig); + } + + public Map submit(Long uid, String descriptorId, String taskName, Map context, String callback, ResourceCheckConfig resourceCheckConfig) { DAG dag = dagDescriptorService.getDAG(uid, context, descriptorId); + String executionId = ExecutionIdUtil.generateExecutionId(dag); dagSubmitChecker.check(executionId, resourceCheckConfig); @@ -128,7 +133,7 @@ public Map submit(Long uid, String descriptorId, Map ret = Maps.newHashMap(); ret.put("execution_id", executionId); return ret; diff --git a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy index 06e09f473..a9fd8bb7f 100644 --- a/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy +++ b/rill-flow-service/src/test/groovy/com/weibo/rill/flow/service/facade/OlympiceneFacadeTest.groovy @@ -89,7 +89,7 @@ class OlympiceneFacadeTest extends Specification { user.getUid() >> 1L expect: facade.submit(1L, "testBusiness:testFeatureName", new JSONObject(["resourceName": "testCallbackUrl"]).toJSONString(), null, new JSONObject(["a": 1]), null) - facade.submit(1L, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null) + facade.submit(1L, "testBusiness:testFeatureName", null, ["resourceName": "testCallbackUrl"], null, null) facade.submit(user, "testBusiness:testFeatureName", ["resourceName": "testCallbackUrl"], null, null) } diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/controller/FlowController.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/controller/FlowController.java index 3ce0fba86..4d9d16b53 100644 --- a/rill-flow-web/src/main/java/com/weibo/rill/flow/controller/FlowController.java +++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/controller/FlowController.java @@ -95,6 +95,7 @@ public class FlowController { @RequestMapping(value = "submit.json", method = RequestMethod.POST) public Map submit(User flowUser, @ApiParam(value = "工作流ID") @RequestParam(value = "descriptor_id") String descriptorId, + @ApiParam(value = "单步执行任务名称") @RequestParam(value = "task_name", required = false) String taskName, @ApiParam(value = "执行完成后的回调地址") @RequestParam(value = "callback", required = false) String callback, @ApiParam(value = "用于检测资源是否可用的检测规则") @RequestParam(value = "resource_check", required = false) String resourceCheck, @ApiParam(value = "工作流执行的context信息") @RequestBody(required = false) JSONObject data) {