Skip to content

Commit f9745ec

Browse files
CYarros10agold-rh
andauthored
latest composer-workload-simulator, and ruff formatting (#1480)
Co-authored-by: Andrew Gold <[email protected]>
1 parent 095427e commit f9745ec

File tree

14 files changed

+529
-166
lines changed

14 files changed

+529
-166
lines changed

Diff for: tools/cloud-composer-stress-testing/cloud-composer-dag-generator/dag_generator/__main__.py

+45-40
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
b) run $python main.py
2222
c) move dags folder generated to the dag buckets in composer like:
2323
gsutil cp -r out gs://BUCKET_NAME/dags
24-
NOTE: the "number_of_operators_defined" variable in the configuration file
24+
NOTE: the "number_of_operators_defined" variable in the configuration file
2525
(config.json) allows to create up to 5 differents kind of task,
2626
none has complex functionallity:
2727
a) bash_operator_echo
@@ -32,22 +32,23 @@
3232
"""
3333

3434
import json
35+
import math
3536
import random
37+
3638
import modules.initDag
3739
import modules.operators
38-
import math
3940

4041

4142
def get_config():
4243
"""module to read configs"""
43-
f = open('config.json', "r")
44+
f = open("config.json", "r")
4445
data = json.loads(f.read())
4546
f.close()
4647
return data
4748

4849

4950
def get_init_content(i):
50-
"""Initialise test DAG with headers """
51+
"""Initialise test DAG with headers"""
5152
modules.initDag.get_init_dag(i)
5253

5354

@@ -60,70 +61,74 @@ def get_task_dag(min_number_of_task_in_dag):
6061
file.close()
6162
return data
6263

63-
# build the dags
64+
65+
# build the dags
6466
def main():
6567
"""main function to create test DAGs"""
6668
# read config file
6769
data = get_config()
6870

69-
number_of_dags_to_generate = data['number_of_dags_to_generate']
70-
min_number_of_task_in_dag = data['min_number_of_task_in_dag']
71-
max_number_of_task_in_dag = data['max_number_of_task_in_dag']
72-
task_min_time_in_sec = data['task_min_time_in_sec']
73-
task_max_time_in_sec = data['task_max_time_in_sec']
74-
percentage_of_job_in_parallel = data['percentage_of_job_in_parallel']
75-
number_of_operators_defined = data['number_of_operators_defined']
76-
file_index = data['file_start_index']
77-
schedules = data['schedules']
71+
number_of_dags_to_generate = data["number_of_dags_to_generate"]
72+
min_number_of_task_in_dag = data["min_number_of_task_in_dag"]
73+
max_number_of_task_in_dag = data["max_number_of_task_in_dag"]
74+
task_min_time_in_sec = data["task_min_time_in_sec"]
75+
task_max_time_in_sec = data["task_max_time_in_sec"]
76+
percentage_of_job_in_parallel = data["percentage_of_job_in_parallel"]
77+
number_of_operators_defined = data["number_of_operators_defined"]
78+
file_index = data["file_start_index"]
79+
schedules = data["schedules"]
7880

7981
# creatting DAG's files
8082
for i in range(number_of_dags_to_generate):
8183
task_list = []
8284
dagf = open(f"out/dagFile_{file_index+i}.py", "w+")
8385
dagf.write(
8486
modules.initDag.get_init_dag(
85-
file_index + i,
86-
schedules[random.randrange(0,
87-
len(schedules) - 1)]))
87+
file_index + i, schedules[random.randrange(0, len(schedules) - 1)]
88+
)
89+
)
8890
dagf.write(modules.operators.start_task())
8991
dagf.write(modules.operators.stop_task())
9092
for task_index in range(
91-
random.randrange(min_number_of_task_in_dag,
92-
max_number_of_task_in_dag)):
93+
random.randrange(min_number_of_task_in_dag, max_number_of_task_in_dag)
94+
):
9395
task_list.append("task_{index}".format(index=task_index))
94-
if (task_index % number_of_operators_defined == 0):
96+
if task_index % number_of_operators_defined == 0:
9597
dagf.write(modules.operators.bash_operator_echo(task_index))
96-
elif (task_index % number_of_operators_defined == 1):
98+
elif task_index % number_of_operators_defined == 1:
9799
dagf.write(
98100
modules.operators.bash_operator_sleep(
99101
task_index,
100-
random.randrange(task_min_time_in_sec,
101-
task_max_time_in_sec)))
102-
elif (task_index % number_of_operators_defined == 2):
102+
random.randrange(task_min_time_in_sec, task_max_time_in_sec),
103+
)
104+
)
105+
elif task_index % number_of_operators_defined == 2:
103106
dagf.write(
104107
modules.operators.python_operator_task_sleep(
105108
task_index,
106-
random.randrange(task_min_time_in_sec,
107-
task_max_time_in_sec)))
108-
elif (task_index % number_of_operators_defined == 3):
109-
dagf.write(
110-
modules.operators.bash_operator_task_ping(task_index))
109+
random.randrange(task_min_time_in_sec, task_max_time_in_sec),
110+
)
111+
)
112+
elif task_index % number_of_operators_defined == 3:
113+
dagf.write(modules.operators.bash_operator_task_ping(task_index))
111114
else:
112-
dagf.write(
113-
modules.operators.python_operator_task_print(task_index))
114-
no_tasks_in_parallel = math.ceil(percentage_of_job_in_parallel / 100 *
115-
len(task_list))
115+
dagf.write(modules.operators.python_operator_task_print(task_index))
116+
no_tasks_in_parallel = math.ceil(
117+
percentage_of_job_in_parallel / 100 * len(task_list)
118+
)
116119
parallel_tasks = []
117-
if (no_tasks_in_parallel > 1):
120+
if no_tasks_in_parallel > 1:
118121
for parallel_task_index in range(no_tasks_in_parallel):
119122
parallel_tasks.append(task_list.pop())
120-
task_list.insert(random.randrange(1,
121-
len(task_list) - 2),
122-
"[{task}]".format(task=",".join(parallel_tasks)))
123-
dagf.write("\n\tchain(start_task,{tasks},stop_task)".format(
124-
tasks=",".join(task_list)))
123+
task_list.insert(
124+
random.randrange(1, len(task_list) - 2),
125+
"[{task}]".format(task=",".join(parallel_tasks)),
126+
)
127+
dagf.write(
128+
"\n\tchain(start_task,{tasks},stop_task)".format(tasks=",".join(task_list))
129+
)
125130
dagf.close()
126131

127132

128133
if __name__ == "__main__":
129-
main()
134+
main()

Diff for: tools/cloud-composer-stress-testing/cloud-composer-dag-generator/dag_generator/modules/initDag.py

+16-12
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@
1818

1919
def get_init_dag(dag_number, schedule):
2020
today = datetime.datetime.now()
21-
if (schedule == "min"):
21+
if schedule == "min":
2222
dag_schedule = "{min} * * * *".format(min=random.randrange(1, 10))
23-
elif (schedule == "hour"):
24-
dag_schedule = "{min} {hour} * * *".format(min=random.randrange(0, 59),
25-
hour=random.randrange(0, 23))
26-
elif (schedule == "everyhalfhour"):
23+
elif schedule == "hour":
24+
dag_schedule = "{min} {hour} * * *".format(
25+
min=random.randrange(0, 59), hour=random.randrange(0, 23)
26+
)
27+
elif schedule == "everyhalfhour":
2728
dag_schedule = "*/30 * * * *"
28-
elif (schedule == "everyhour"):
29+
elif schedule == "everyhour":
2930
dag_schedule = "0 * * * *"
3031
else:
3132
dag_schedule = "{min} {hour} {day} * *".format(
3233
min=random.randrange(0, 59),
3334
hour=random.randrange(0, 23),
34-
day=random.randrange(1, 28))
35+
day=random.randrange(1, 28),
36+
)
3537

3638
lines = """import time
3739
from datetime import datetime
@@ -52,9 +54,11 @@ def get_init_dag(dag_number, schedule):
5254
default_args=default_args,
5355
catchup=False
5456
) as dag:
55-
""".format(dag_id=dag_number,
56-
start_year=today.year,
57-
schedule=dag_schedule,
58-
start_month=today.month,
59-
start_day=today.day)
57+
""".format(
58+
dag_id=dag_number,
59+
start_year=today.year,
60+
schedule=dag_schedule,
61+
start_month=today.month,
62+
start_day=today.day,
63+
)
6064
return lines
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
**/.DS_Store
22
taskflow_collections/__pycache__
3-
utils/*
3+
.ruff_cache

Diff for: tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ Sizing a Composer Environment is an estimated process. There are many different
6161
experiment_id: experiment_1
6262
number_of_dags: 10
6363
tasks_per_dag: 3
64+
paused: 0.5 # weighted chance for dag to be paused
6465
6566
# Schedules and weights
6667
schedules:
@@ -97,7 +98,7 @@ default_settings:
9798
deferrable: true
9899
retries: 1
99100
catchup: false
100-
is_paused_upon_creation: false
101+
is_paused_upon_creation: false # true will pause all dags
101102
execution_timeout: 30
102103
sla: 25
103104
project_id: your-project

Diff for: tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/configs/custom.yaml

+9-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
experiment_id: custom_experiment_1
1616
number_of_dags: 1
1717
min_tasks_per_dag: 1
18+
paused: 0.5
1819

1920
# Schedules and weights
2021
schedules:
@@ -35,13 +36,13 @@ default_settings:
3536
project_id: your-project
3637
region: your-region
3738
composer_environment: your-environment
38-
deferrable: true
39+
deferrable: false
3940
retries: 1
40-
retry_delay: 2 # minutes
41+
retry_delay: 2 # minutes
4142
catchup: false
42-
is_paused_upon_creation: false
43-
dagrun_timeout: 60 # minutes
44-
execution_timeout: 30 # minutes
45-
sla: 25 # minutes
46-
mode: poke
47-
poke_interval: 120 # seconds
43+
is_paused_upon_creation: false # True will override and set all dags to paused
44+
dagrun_timeout: 60 # minutes
45+
execution_timeout: 30 # minutes
46+
sla: 25 # minutes
47+
mode: reschedule
48+
poke_interval: 60 # seconds

Diff for: tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/configs/sample.yaml

+10-9
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
experiment_id: experiment_4
15+
experiment_id: sample
1616
number_of_dags: 1
1717
min_tasks_per_dag: 10
18+
paused: 0.5
1819

1920
# Schedules and weights
2021
schedules:
@@ -52,13 +53,13 @@ default_settings:
5253
project_id: your-project
5354
region: your-region
5455
composer_environment: your-environment
55-
deferrable: true
56+
deferrable: false
5657
retries: 1
57-
retry_delay: 2 # minutes
58+
retry_delay: 2 # minutes
5859
catchup: false
59-
is_paused_upon_creation: false
60-
dagrun_timeout: 60 # minutes
61-
execution_timeout: 30 # minutes
62-
sla: 25 # minutes
63-
mode: poke
64-
poke_interval: 120 # seconds
60+
is_paused_upon_creation: false # True will override and set all dags to paused
61+
dagrun_timeout: 60 # minutes
62+
execution_timeout: 30 # minutes
63+
sla: 25 # minutes
64+
mode: reschedule
65+
poke_interval: 60 # seconds

0 commit comments

Comments
 (0)