diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index a0add6a..e81d028 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -1,6 +1,7 @@ from prefect import task, flow, get_run_logger -from data_validation import data_validation -from prefect2_test_flow import hello_world +from prefect_dask.task_runners import DaskTaskRunner +from data_validation import data_validation, read_all_streams +from prefect2_test_flow import hello_world, print_task # from long_flow import long_flow @@ -10,11 +11,13 @@ def log_completion(): logger.info("Complete") -@flow +@flow(task_runner=DaskTaskRunner()) def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] - hello_world() - data_validation(uid, return_state=True) + # hello_world() + print_task.submit() + # data_validation(uid, return_state=True) + read_all_streams.submit(uid, beamline_acronym="tst") # long_flow(iterations=100, sleep_length=10) log_completion()