Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions data_processing/sante/finess/DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
check_if_modif,
get_finess_columns,
get_geoloc_columns,
build_finess_table,
build_finess_table_entites_juridiques,
build_finess_table_etablissements,
send_to_minio,
publish_on_datagouv,
send_notification_mattermost,
Expand All @@ -32,60 +33,113 @@
tags=["data_processing", "sante", "finess"],
default_args=default_args,
) as dag:
check_if_modif = ShortCircuitOperator(
task_id="check_if_modif",
python_callable=check_if_modif,
)

clean_previous_outputs = BashOperator(
task_id="clean_previous_outputs",
bash_command=f"rm -rf {TMP_FOLDER} && mkdir -p {DATADIR}",
)

get_finess_columns = PythonOperator(
task_id="get_finess_columns",
# pipeline établissements
check_if_modif_etablissements = ShortCircuitOperator(
task_id="check_if_modif_etablissements",
python_callable=check_if_modif,
op_kwargs={"scope": "etablissements"},
)

get_finess_columns_etablissements = PythonOperator(
task_id="get_finess_columns_etablissements",
python_callable=get_finess_columns,
op_kwargs={"scope": "etablissements"},
)

get_geoloc_columns = PythonOperator(
task_id="get_geoloc_columns",
python_callable=get_geoloc_columns,
)

build_finess_table = PythonOperator(
task_id="build_finess_table",
python_callable=build_finess_table,
build_finess_table_etablissements = PythonOperator(
task_id="build_finess_table_etablissements",
python_callable=build_finess_table_etablissements,
)

send_to_minio_etablissements = PythonOperator(
task_id="send_to_minio_etablissements",
python_callable=send_to_minio,
op_kwargs={"scope": "etablissements"},
)

publish_on_datagouv_etablissements = PythonOperator(
task_id="publish_on_datagouv_etablissements",
python_callable=publish_on_datagouv,
op_kwargs={"scope": "etablissements"},
)

check_if_modif_etablissements.set_upstream(clean_previous_outputs)

get_finess_columns_etablissements.set_upstream(check_if_modif_etablissements)
get_geoloc_columns.set_upstream(check_if_modif_etablissements)

build_finess_table_etablissements.set_upstream(get_finess_columns_etablissements)
build_finess_table_etablissements.set_upstream(get_geoloc_columns)

send_to_minio_etablissements.set_upstream(build_finess_table_etablissements)
publish_on_datagouv_etablissements.set_upstream(send_to_minio_etablissements)

# pipeline entités juridiques
check_if_modif_entites_juridiques = ShortCircuitOperator(
task_id="check_if_modif_entites_juridiques",
python_callable=check_if_modif,
op_kwargs={"scope": "entites_juridiques"},
)

get_finess_columns_entites_juridiques = PythonOperator(
task_id="get_finess_columns_entites_juridiques",
python_callable=get_finess_columns,
op_kwargs={"scope": "entites_juridiques"},
)

send_to_minio = PythonOperator(
task_id="send_to_minio",
build_finess_table_entites_juridiques = PythonOperator(
task_id="build_finess_table_entites_juridiques",
python_callable=build_finess_table_entites_juridiques,
)

send_to_minio_entites_juridiques = PythonOperator(
task_id="send_to_minio_entites_juridiques",
python_callable=send_to_minio,
op_kwargs={"scope": "entites_juridiques"},
)

publish_on_datagouv = PythonOperator(
task_id="publish_on_datagouv",
publish_on_datagouv_entites_juridiques = PythonOperator(
task_id="publish_on_datagouv_entites_juridiques",
python_callable=publish_on_datagouv,
op_kwargs={"scope": "entites_juridiques"},
)

check_if_modif_entites_juridiques.set_upstream(clean_previous_outputs)
get_finess_columns_entites_juridiques.set_upstream(
check_if_modif_entites_juridiques
)
build_finess_table_entites_juridiques.set_upstream(
get_finess_columns_entites_juridiques
)
send_to_minio_entites_juridiques.set_upstream(build_finess_table_entites_juridiques)
publish_on_datagouv_entites_juridiques.set_upstream(
send_to_minio_entites_juridiques
)

# final steps
clean_up = BashOperator(
task_id="clean_up",
bash_command=f"rm -rf {TMP_FOLDER}",
trigger_rule="none_failed_or_skipped",
)

send_notification_mattermost = PythonOperator(
task_id="send_notification_mattermost",
python_callable=send_notification_mattermost,
trigger_rule="none_failed_or_skipped",
)

clean_previous_outputs.set_upstream(check_if_modif)

get_finess_columns.set_upstream(clean_previous_outputs)
get_geoloc_columns.set_upstream(clean_previous_outputs)

build_finess_table.set_upstream(get_finess_columns)
build_finess_table.set_upstream(get_geoloc_columns)
clean_up.set_upstream(publish_on_datagouv_etablissements)
clean_up.set_upstream(publish_on_datagouv_entites_juridiques)

send_to_minio.set_upstream(build_finess_table)
publish_on_datagouv.set_upstream(send_to_minio)
clean_up.set_upstream(publish_on_datagouv)
send_notification_mattermost.set_upstream(clean_up)
56 changes: 42 additions & 14 deletions data_processing/sante/finess/config/dgv.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,50 @@
{
"csv" : {
"dev": {
"dataset_id": "",
"resource_id": ""
"etablissements": {
"source_dataset": "53699569a3a729239d2046eb",
"source_doc": "d06a0924-9931-4a60-83b6-93abdb6acfd6",
"csv" : {
"dev": {
"dataset_id": "68ee294549988277f2dd0399",
"resource_id": "be1dd455-a661-49de-b71e-86c9ef8b8453"
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "935e1dc5-afce-4d45-9fda-47b18f7444e6"
}
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "935e1dc5-afce-4d45-9fda-47b18f7444e6"
"parquet" : {
"dev": {
"dataset_id": "68ee294549988277f2dd0399",
"resource_id": "3d5d01a1-4605-4cb4-a79f-8f8ec2ed99a2"
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "5eb67ddc-b58c-4531-a3f2-2c2843cf3986"
}
}
},
"parquet" : {
"dev": {
"dataset_id": "",
"resource_id": ""
"entites_juridiques": {
"source_dataset": "5369956ba3a729239d2046fc",
"source_doc": "9c2bfada-8339-4dcb-aec2-af0be16c99c0",
"csv" : {
"dev": {
"dataset_id": "68ee294549988277f2dd0399",
"resource_id": "b8ebb8f7-6576-44d5-ab65-c820231397da"
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "35d75f72-3ae5-4c4a-abd2-9aad340576a1"
}
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "5eb67ddc-b58c-4531-a3f2-2c2843cf3986"
"parquet" : {
"dev": {
"dataset_id": "68ee294549988277f2dd0399",
"resource_id": "17d6ad4d-7f77-49ce-a860-1a86e1d66be2"
},
"prod": {
"dataset_id": "67e43007cd5e91b9fdcbc7b3",
"resource_id": "69cb3e2f-dc0a-4cee-800a-cf98901d257d"
}
}
}
}
Loading