1+ """A word-counting workflow."""
2+
3+ # pytype: skip-file
4+
5+ import argparse
6+ import logging
7+ import re
8+
9+ import apache_beam as beam
10+ from apache_beam .io import ReadFromText
11+ from apache_beam .io import WriteToText
12+ from apache_beam .options .pipeline_options import PipelineOptions
13+ from apache_beam .options .pipeline_options import SetupOptions
14+
15+
16+ class WordExtractingDoFn (beam .DoFn ):
17+ """Parse each line of input text into words."""
18+ def process (self , element ):
19+ """Returns an iterator over the words of this element.
20+ The element is a line of text. If the line is blank, note that, too.
21+ Args:
22+ element: the element being processed
23+ Returns:
24+ The processed element.
25+ """
26+ return re .findall (r'[\w\']+' , element , re .UNICODE )
27+
28+
29+ def run (argv = None , save_main_session = True ):
30+ """Main entry point; defines and runs the wordcount pipeline."""
31+ parser = argparse .ArgumentParser ()
32+ parser .add_argument (
33+ '--input' ,
34+ dest = 'input' ,
35+ default = 'gs://dataflow-samples/shakespeare/kinglear.txt' ,
36+ help = 'Input file to process.' )
37+ parser .add_argument (
38+ '--output' ,
39+ dest = 'output' ,
40+ required = True ,
41+ help = 'Output file to write results to.' )
42+ known_args , pipeline_args = parser .parse_known_args (argv )
43+
44+ # We use the save_main_session option because one or more DoFn's in this
45+ # workflow rely on global context (e.g., a module imported at module level).
46+ pipeline_options = PipelineOptions (pipeline_args )
47+ pipeline_options .view_as (SetupOptions ).save_main_session = save_main_session
48+
49+ # The pipeline will be run on exiting the with block.
50+ with beam .Pipeline (options = pipeline_options ) as p :
51+
52+ # Read the text file[pattern] into a PCollection.
53+ lines = p | 'Read' >> ReadFromText (known_args .input )
54+
55+ counts = (
56+ lines
57+ | 'Split' >> (beam .ParDo (WordExtractingDoFn ()).with_output_types (str ))
58+ | 'PairWIthOne' >> beam .Map (lambda x : (x , 1 ))
59+ | 'GroupAndSum' >> beam .CombinePerKey (sum ))
60+
61+ # Format the counts into a PCollection of strings.
62+ def format_result (word , count ):
63+ return '%s: %d' % (word , count )
64+
65+ output = counts | 'Format' >> beam .MapTuple (format_result )
66+
67+ # Write the output using a "Write" transform that has side effects.
68+ # pylint: disable=expression-not-assigned
69+ output | 'Write' >> WriteToText (known_args .output )
70+
71+
72+ if __name__ == '__main__' :
73+ logging .getLogger ().setLevel (logging .INFO )
74+ print ("..............................." )
75+ run ()
0 commit comments