From 597d689d1d678ab98d08259e9a5235f54f20f98d Mon Sep 17 00:00:00 2001 From: Abigail Giles Date: Thu, 20 Jun 2024 16:35:50 -0400 Subject: [PATCH 1/2] Try DaskTaskRunner --- end_of_run_workflow.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index a0add6a..3040e1f 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -1,4 +1,5 @@ from prefect import task, flow, get_run_logger +from prefect_dask.task_runners import DaskTaskRunner from data_validation import data_validation from prefect2_test_flow import hello_world # from long_flow import long_flow @@ -10,11 +11,11 @@ def log_completion(): logger.info("Complete") -@flow +@flow(task_runner=DaskTaskRunner()) def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] - hello_world() - data_validation(uid, return_state=True) + hello_world.submit() + data_validation.submit(uid, return_state=True) # long_flow(iterations=100, sleep_length=10) log_completion() From 05ad64fac35c5114ef375dfc2a33628d3e2fbc07 Mon Sep 17 00:00:00 2001 From: Juan Marulanda Date: Mon, 24 Jun 2024 17:30:53 -0400 Subject: [PATCH 2/2] Modified flow to run tasks with DaskTaskRunner directly --- end_of_run_workflow.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index a0add6a..e81d028 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -1,6 +1,7 @@ from prefect import task, flow, get_run_logger -from data_validation import data_validation -from prefect2_test_flow import hello_world +from prefect_dask.task_runners import DaskTaskRunner +from data_validation import data_validation, read_all_streams +from prefect2_test_flow import hello_world, print_task # from long_flow import long_flow @@ -10,11 +11,13 @@ def log_completion(): logger.info("Complete") -@flow +@flow(task_runner=DaskTaskRunner()) def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] - hello_world() - data_validation(uid, return_state=True) + # hello_world() + print_task.submit() + # data_validation(uid, return_state=True) + read_all_streams.submit(uid, beamline_acronym="tst") # long_flow(iterations=100, sleep_length=10) log_completion()