Skip to content

Commit ed3d094

Browse files
committed
Adding python flex template to convert raw text/image objects to ArrayRecord in GCS
1 parent 1503db3 commit ed3d094

File tree

8 files changed

+3071
-0
lines changed

8 files changed

+3071
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ follow [GitHub's branch renaming guide](https://docs.github.com/en/repositories/
123123
- [Bulk Decompress Files on Cloud Storage](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Bulk_Decompress_GCS_Files&type=code)
124124
- [Bulk Delete Entities in Firestore (Datastore mode)](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Firestore_to_Firestore_Delete&type=code)
125125
- [Convert file formats between Avro, Parquet & CSV](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20File_Format_Conversion&type=code)
126+
- [Convert text or images to ArrayRecord format](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20ArrayRecord_Converter&type=code)
126127
- [Streaming Data Generator](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Streaming_Data_Generator&type=code)
127128
- Legacy Templates
128129
- [Bulk Delete Entities in Datastore [Deprecated]](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Datastore_to_Datastore_Delete&type=code)
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
2+
Array Record Converter (Python) template
3+
---
4+
Batch pipeline. Reads text or image files from Cloud Storage, and converts the files 1:1 into [array record](https://github.com/google/array_record) format.
5+
6+
7+
:memo: This is a Google-provided template! Please
8+
check [Provided templates documentation](https://cloud.google.com/dataflow/docs/guides/templates/provided-templates)
9+
on how to use it without having to build from sources using [Create job from template](https://console.cloud.google.com/dataflow/createjob?template=Word_Count_Python).
10+
11+
:bulb: This is a generated documentation based
12+
on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates#metadata-annotations)
13+
. Do not change this file directly.
14+
15+
## Parameters
16+
17+
### Required Parameters
18+
19+
* **input_path** (Input file(s) in Cloud Storage): The input file pattern Dataflow reads from. Use the example file (gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own using the same format: gs://your-bucket/your-file.txt.
20+
* **input_format** (Input file format): The format of the files intended for conversion. Currently supports `text` or `image`.
21+
* **output_path** (Output Cloud Storage file prefix): Path prefix for writing output files. Ex: gs://your-bucket/arrayrecord.
22+
23+
### Optional Parameters
24+
25+
26+
27+
28+
## Getting Started
29+
30+
### Requirements
31+
32+
* Java 11
33+
* Maven
34+
* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the
35+
following commands:
36+
* `gcloud auth login`
37+
* `gcloud auth application-default login`
38+
39+
:star2: Those dependencies are pre-installed if you use Google Cloud Shell!
40+
41+
42+
43+
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=python/src/main/java/com/google/cloud/teleport/templates/python/WordCountPython.java)
44+
45+
### Templates Plugin
46+
47+
This README provides instructions using
48+
the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates#templates-plugin)
49+
. Install the plugin with the following command before proceeding:
50+
51+
```shell
52+
mvn clean install -pl plugins/templates-maven-plugin -am
53+
```
54+
55+
### Building Template
56+
57+
This template is a Flex Template, meaning that the pipeline code will be
58+
containerized and the container will be executed on Dataflow. Please
59+
check [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
60+
and [Configure Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates)
61+
for more information.
62+
63+
#### Staging the Template
64+
65+
If the plan is to just stage the template (i.e., make it available to use) by
66+
the `gcloud` command or Dataflow "Create job from template" UI,
67+
the `-PtemplatesStage` profile should be used:
68+
69+
```shell
70+
export PROJECT=<my-project>
71+
export BUCKET_NAME=<bucket-name>
72+
73+
mvn clean package -PtemplatesStage \
74+
-DskipTests \
75+
-DprojectId="$PROJECT" \
76+
-DbucketName="$BUCKET_NAME" \
77+
-DstagePrefix="templates" \
78+
-DtemplateName="ArrayRecord_Converter" \
79+
-pl python \
80+
-am
81+
```
82+
83+
84+
The command should build and save the template to Google Cloud, and then print
85+
the complete location on Cloud Storage:
86+
87+
```
88+
Flex Template was staged! gs://<bucket-name>/templates/flex/ArrayRecord_Python
89+
```
90+
91+
The specific path should be copied as it will be used in the following steps.
92+
93+
#### Running the Template
94+
95+
**Using the staged template**:
96+
97+
You can use the path above run the template (or share with others for execution).
98+
99+
To start a job with the template at any time using `gcloud`, you are going to
100+
need valid resources for the required parameters.
101+
102+
Provided that, the following command line can be used:
103+
104+
```shell
105+
export PROJECT=<my-project>
106+
export BUCKET_NAME=<bucket-name>
107+
export REGION=us-central1
108+
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/ArrayRecord_Python"
109+
110+
### Required
111+
export INPUT_PATH=<input>
112+
export INPUT_FORMAT=<text|image>
113+
export OUTPUT_PATH=<output>
114+
115+
### Optional
116+
117+
gcloud dataflow flex-template run "array-record-converter-job" \
118+
--project "$PROJECT" \
119+
--region "$REGION" \
120+
--template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \
121+
--parameters "input_path=$INPUT_PATH" \
122+
--parameters "input_format=$INPUT_FORMAT" \
123+
--parameters "output_path=$OUTPUT_PATH"
124+
```
125+
126+
For more information about the command, please check:
127+
https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run
128+
129+
130+
**Using the plugin**:
131+
132+
Instead of just generating the template in the folder, it is possible to stage
133+
and run the template in a single command. This may be useful for testing when
134+
changing the templates.
135+
136+
```shell
137+
export PROJECT=<my-project>
138+
export BUCKET_NAME=<bucket-name>
139+
export REGION=us-central1
140+
141+
### Required
142+
export INPUT_PATH=<input>
143+
export INPUT_FORMAT=<text|image>
144+
export OUTPUT_PATH=<output>
145+
146+
### Optional
147+
148+
mvn clean package -PtemplatesRun \
149+
-DskipTests \
150+
-DprojectId="$PROJECT" \
151+
-DbucketName="$BUCKET_NAME" \
152+
-Dregion="$REGION" \
153+
-DjobName="array-record-converter-job" \
154+
-DtemplateName="ArrayRecord_Python" \
155+
-Dparameters="input_path=$INPUT_PATH,input_format=$INPUT_FORMAT,output_path=$OUTPUT_PATH" \
156+
-pl python \
157+
-am
158+
```
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.templates.python;
17+
18+
import com.google.cloud.teleport.metadata.Template;
19+
import com.google.cloud.teleport.metadata.TemplateCategory;
20+
import com.google.cloud.teleport.metadata.TemplateParameter;
21+
22+
/** template class for ArrayRecordConverter in Python. */
23+
@Template(
24+
name = "ArrayRecord_Converter",
25+
category = TemplateCategory.UTILITIES,
26+
type = Template.TemplateType.PYTHON,
27+
displayName = "ArrayRecord Converter Job",
28+
description =
29+
"The ArrayRecord_Converter Template is used to convert bulk text/image dataset in GCS into ArrayRecord Datasets in GCS"
30+
+ "An input GCS path can be passed in and resulting data will be uploaded to the `output_path`",
31+
flexContainerName = "arrayrecord-converter",
32+
contactInformation = "https://cloud.google.com/support"
33+
)
34+
public interface ArrayRecordConverter {
35+
@TemplateParameter.GcsReadFile(
36+
order = 1,
37+
name = "input_path",
38+
optional = false,
39+
description = "Input GCS path to match all files(e.g., gcs://example/*.txt)",
40+
helpText = "An input path in the form of a GCS path.")
41+
String getInputPath();
42+
43+
@TemplateParameter.Text(
44+
order = 2,
45+
name = "input_format",
46+
optional = false,
47+
description = "Input format of the data, can be either text or image",
48+
helpText = "The format of the input, which can be either text or image. This job does not support other values.")
49+
String getInputFormat();
50+
51+
@TemplateParameter.GcsWriteFolder(
52+
order = 3,
53+
name = "output_path",
54+
optional = false,
55+
description = "Output GCS path where files are generated",
56+
helpText =
57+
"Output path in the form of a GCS path where files will be uploaded.")
58+
String getOutputPath();
59+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
FROM gcr.io/dataflow-templates-base/python311-template-launcher-base
2+
3+
ARG WORKDIR=/template
4+
RUN mkdir -p ${WORKDIR}
5+
COPY main.py /template
6+
COPY requirements.txt /template
7+
WORKDIR ${WORKDIR}
8+
9+
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt
10+
ENV FLEX_TEMPLATE_PYTHON_PY_FILE=main.py
11+
12+
# Install dependencies to launch the pipeline
13+
RUN pip install -U --require-hashes --no-deps -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
14+
RUN pip download --no-cache-dir --require-hashes --no-deps --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
15+
16+
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#
2+
# Copyright (C) 2025 Google Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
# use this file except in compliance with the License. You may obtain a copy of
6+
# the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations under
14+
# the License.
15+
#
16+
"""A template workflow to convert raw objects(text, images) into ArrayRecord files."""
17+
18+
import argparse
19+
import logging
20+
import os
21+
import urllib
22+
23+
import apache_beam as beam
24+
from apache_beam.options.pipeline_options import PipelineOptions
25+
from apache_beam.options.pipeline_options import SetupOptions
26+
from array_record.python.array_record_module import ArrayRecordWriter
27+
from google.cloud import storage
28+
29+
30+
class ConvertToArrayRecordGCS(beam.DoFn):
31+
"""Write a tuple consisting of a filename and records to GCS ArrayRecords."""
32+
33+
_WRITE_DIR = '/tmp/'
34+
35+
def process(
36+
self,
37+
element,
38+
path,
39+
write_dir=_WRITE_DIR,
40+
file_path_suffix='.arrayrecord',
41+
overwrite_extension=False,
42+
):
43+
44+
## Upload to GCS
45+
def upload_to_gcs(
46+
bucket_name, filename, prefix='', source_dir=self._WRITE_DIR
47+
):
48+
source_filename = os.path.join(source_dir, filename)
49+
blob_name = os.path.join(prefix, filename)
50+
storage_client = storage.Client()
51+
bucket = storage_client.get_bucket(bucket_name)
52+
blob = bucket.blob(blob_name)
53+
blob.upload_from_filename(source_filename)
54+
55+
## Simple logic for stripping a file extension and replacing it
56+
def fix_filename(filename):
57+
base_name = os.path.splitext(filename)[0]
58+
new_filename = base_name + file_path_suffix
59+
return new_filename
60+
61+
parsed_gcs_path = urllib.parse.urlparse(path)
62+
bucket_name = parsed_gcs_path.hostname
63+
gcs_prefix = parsed_gcs_path.path.lstrip('/')
64+
65+
if overwrite_extension:
66+
filename = fix_filename(os.path.basename(element[0]))
67+
else:
68+
filename = '{}{}'.format(os.path.basename(element[0]), file_path_suffix)
69+
70+
write_path = os.path.join(write_dir, filename)
71+
writer = ArrayRecordWriter(write_path, 'group_size:1')
72+
73+
for item in element[1]:
74+
writer.write(bytes(item, 'utf-8'))
75+
76+
writer.close()
77+
78+
upload_to_gcs(bucket_name, filename, prefix=gcs_prefix)
79+
os.remove(os.path.join(write_dir, filename))
80+
81+
82+
def run(argv=None, save_main_session=True):
83+
"""Main entry point; defines and runs the wordcount pipeline."""
84+
parser = argparse.ArgumentParser()
85+
parser.add_argument(
86+
'--input_path',
87+
dest='input_path',
88+
default=(
89+
'gs://converter-datasets/input-datasets/google-top-terms/csv/1k/*.csv'
90+
),
91+
help='Input file to process.',
92+
)
93+
parser.add_argument(
94+
'--input_format',
95+
dest='input_format',
96+
default='text',
97+
help='Input file format.',
98+
)
99+
parser.add_argument(
100+
'--output_path',
101+
dest='output_path',
102+
required=True,
103+
help='Output destination to write results to.',
104+
)
105+
known_args, pipeline_args = parser.parse_known_args(argv)
106+
107+
# We use the save_main_session option because one or more DoFn's in this
108+
# workflow rely on global context (e.g., a module imported at module level).
109+
pipeline_options = PipelineOptions(pipeline_args)
110+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
111+
112+
# The pipeline will be run on exiting the with block.
113+
with beam.Pipeline(options=pipeline_options) as p:
114+
# TODO(iamphani): Move this out to array_record/beam/pipelines.py once it is
115+
# updated to support raw text.
116+
117+
files = p | "Start" >> beam.Create([known_args.input_path])
118+
if(known_args.input_format == 'text'):
119+
parsed_files = files | "Read Text Files" >> beam.io.ReadAllFromText(
120+
with_filename=True
121+
)
122+
elif(known_args.input_format == 'image'):
123+
parsed_files = files | "Read Image Files" >> beam.io.ReadAllFromBinaryFiles(with_filename=True)
124+
else:
125+
raise ValueError(f"Unsupported input format: {known_args.input_format}")
126+
_ = (
127+
parsed_files
128+
| "Group" >> beam.GroupByKey()
129+
| "Write to ArrayRecord in GCS"
130+
>> beam.ParDo(
131+
ConvertToArrayRecordGCS(),
132+
known_args.output_path,
133+
file_path_suffix=".arrayrecord",
134+
overwrite_extension=False,
135+
)
136+
)
137+
138+
139+
if __name__ == '__main__':
140+
logging.getLogger().setLevel(logging.INFO)
141+
run()

0 commit comments

Comments
 (0)