This repository was archived by the owner on Apr 3, 2024. It is now read-only.
forked from apache/airflow
-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathexample_dataprep.py
77 lines (69 loc) · 2.65 KB
/
example_dataprep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that shows how to use Google Dataprep.
"""
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataprep import (
DataprepGetJobGroupOperator,
DataprepGetJobsForJobGroupOperator,
DataprepRunJobGroupOperator,
)
from airflow.utils import dates
DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID', 12345677))
DATA = {
"wrangledDataset": {"id": DATAPREP_JOB_RECIPE_ID},
"overrides": {
"execution": "dataflow",
"profiler": False,
"writesettings": [
{
"path": DATAPREP_BUCKET,
"action": "create",
"format": "csv",
"compression": "none",
"header": False,
"asSingleFile": False,
}
],
},
}
with models.DAG(
"example_dataprep",
schedule_interval=None,
start_date=dates.days_ago(1), # Override to match your needs
) as dag:
# [START how_to_dataprep_run_job_group_operator]
run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group", body_request=DATA)
# [END how_to_dataprep_run_job_group_operator]
# [START how_to_dataprep_get_jobs_for_job_group_operator]
get_jobs_for_job_group = DataprepGetJobsForJobGroupOperator(
task_id="get_jobs_for_job_group", job_id=DATAPREP_JOB_ID
)
# [END how_to_dataprep_get_jobs_for_job_group_operator]
# [START how_to_dataprep_get_job_group_operator]
get_job_group = DataprepGetJobGroupOperator(
task_id="get_job_group",
job_group_id=DATAPREP_JOB_ID,
embed="",
include_deleted=False,
)
# [END how_to_dataprep_get_job_group_operator]
run_job_group >> [get_jobs_for_job_group, get_job_group]