Skip to content

More general python codegen #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
import time
from pathlib import Path
import generated_transformer
import awkward as ak
import uproot
import pyarrow.parquet as pq
import numpy as np
instance = os.environ.get('INSTANCE_NAME', 'Unknown')
default_tree_name = "servicex"
default_branch_name = "branch"
Expand All @@ -22,58 +18,74 @@ def transform_single_file(file_path: str, output_path: Path, output_format: str)
try:
stime = time.time()

output = generated_transformer.run_query(file_path)

ttime = time.time()

if output_format == 'root-file':
# We first see if the function has the signature to directly write output
# If it doesn't, then we assume it's giving us back awkward array results
try:
generated_transformer.run_query(file_path, str(output_path))
if not output_path.exists():
raise RuntimeError("Transformation did not produce expected output file "
f"{output_path}")
ttime = time.time()
etime = time.time()
if isinstance(output, ak.Array):
awkward_arrays = {default_tree_name: output}
elif isinstance(output, dict):
awkward_arrays = output
with open(output_path, 'b+w') as wfile:
with uproot.recreate(wfile) as writer:
for key in awkward_arrays.keys():
total_events = awkward_arrays[key].__len__()
if awkward_arrays[key].fields and total_events:
o_dict = {field: awkward_arrays[key][field]
for field in awkward_arrays[key].fields}
elif awkward_arrays[key].fields and not total_events:
o_dict = {field: np.array([])
for field in awkward_arrays[key].fields}
elif not awkward_arrays[key].fields and total_events:
o_dict = {default_branch_name: awkward_arrays[key]}
else:
o_dict = {default_branch_name: np.array([])}
writer[key] = o_dict

wtime = time.time()
elif output_format == 'raw-file':
etime = time.time()
total_events = 0
output_path = output
wtime = time.time()
else:
if isinstance(output, dict):
tree_name = list(output.keys())[0]
awkward_array = output[tree_name]
print(f'Returned type from your Python function is a dictionary - '
f'Only the first key {tree_name} will be written as parquet files. '
f'Please use root-file output to write all trees.')
except AttributeError:
import awkward as ak
import uproot
import pyarrow.parquet as pq
import numpy as np

output = generated_transformer.run_query(file_path)

ttime = time.time()
if output_format == 'root-file':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about parquet, or all the other output formats that are allowed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is why we should really have a "raw" option for the output format. The most obvious need for this code path is running RDataFrame transformations, where the output is in fact going to be a ROOT file.

Conceptually I guess the big question is whether we want to try to share the Python transformer code between the uproot science image (where a "translate awkward output to root/parquet" step is natural) and the C++ ROOT image (where in the end the user writing the Python code is responsible for the output ... unless we ask them to return an RDF object for snapshotting, or something - note this isn't an entirely implausible route, it's possible to do RDF -> Awkward -> Parquet).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is a good point. In my mind I was thinking this was common code.

We have a capabilities matrix, right - so the user can't request an unsupported format? Or if you request parquet from somethign that doesn't support it you get back root? I can't remember how we handle that now (in the old days, we'd just make best effort to pay attention to that request).

etime = time.time()
if isinstance(output, ak.Array):
awkward_arrays = {default_tree_name: output}
elif isinstance(output, dict):
awkward_arrays = output
with open(output_path, 'b+w') as wfile:
with uproot.recreate(wfile) as writer:
for key in awkward_arrays.keys():
total_events = awkward_arrays[key].__len__()
if awkward_arrays[key].fields and total_events:
o_dict = {field: awkward_arrays[key][field]
for field in awkward_arrays[key].fields}
elif awkward_arrays[key].fields and not total_events:
o_dict = {field: np.array([])
for field in awkward_arrays[key].fields}
elif not awkward_arrays[key].fields and total_events:
o_dict = {default_branch_name: awkward_arrays[key]}
else:
o_dict = {default_branch_name: np.array([])}
writer[key] = o_dict

wtime = time.time()
elif output_format == 'raw-file':
etime = time.time()
total_events = 0
output_path = output
wtime = time.time()
else:
awkward_array = output
if isinstance(output, dict):
tree_name = list(output.keys())[0]
awkward_array = output[tree_name]
print(f'Returned type from your Python function is a dictionary - '
f'Only the first key {tree_name} will be written as parquet files. '
f'Please use root-file output to write all trees.')
else:
awkward_array = output

total_events = ak.num(awkward_array, axis=0)
arrow = ak.to_arrow_table(awkward_array)
total_events = ak.num(awkward_array, axis=0)
arrow = ak.to_arrow_table(awkward_array)

etime = time.time()
etime = time.time()

writer = pq.ParquetWriter(output_path, arrow.schema)
writer.write_table(table=arrow)
writer.close()
writer = pq.ParquetWriter(output_path, arrow.schema)
writer.write_table(table=arrow)
writer.close()

wtime = time.time()
wtime = time.time()

output_size = os.stat(output_path).st_size
print(f'Detailed transformer times. query_time:{round(ttime - stime, 3)} '
Expand Down