-
Notifications
You must be signed in to change notification settings - Fork 285
Open
Description
Hello,
I spotted an error when running some code which I've managed to reproduce by modifying one of the petastorm tests:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm import make_reader
from petastorm.codecs import ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.predicates import in_lambda
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
import numpy as np
import pytest
def test_predicate_on_partitioned_dataset(tmpdir):
"""
Generates a partitioned dataset and ensures that readers evaluate the type of the partition
column according to the type given in the Unischema.
"""
TestSchema = Unischema('TestSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('id2', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('test_field', np.int32, (), ScalarCodec(IntegerType()), False),
])
def test_row_generator(x):
"""Returns a single entry in the generated dataset."""
return {'id': x,
'id2': x+1,
'test_field': x*x}
rowgroup_size_mb = 256
dataset_url = "file://{0}/partitioned_test_dataset".format(tmpdir)
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
rows_count = 10
with materialize_dataset(spark, dataset_url, TestSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(test_row_generator)\
.map(lambda x: dict_to_spark_row(TestSchema, x))
spark.createDataFrame(rows_rdd, TestSchema.as_spark_schema()) \
.write \
.partitionBy('id', 'id2') \
.parquet(dataset_url)
with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == 3)) as reader:
assert next(reader).id == 3
with make_reader(dataset_url, predicate=in_lambda(['id'], lambda x: x == '3')) as reader:
with pytest.raises(StopIteration):
# Predicate should have selected none, so a StopIteration should be raised.
next(reader)
print("all okay")
import tempfile
tmpfile = tempfile.TemporaryDirectory()
tmpdir = tmpfile.name
print(tmpdir)
test_predicate_on_partitioned_dataset(tmpdir)
tmpfile.cleanup()
The error (note the line numbers are a little different because I've added some printlns whilst debugging):
File "/home/jamespr/horovod-env/venv/lib64/python3.6/site-packages/petastorm/py_dict_reader_worker.py", line 221, in _load_rows_with_predicate
shuffle_row_drop_partition)
File "/home/jamespr/horovod-env/venv/lib64/python3.6/site-packages/petastorm/py_dict_reader_worker.py", line 283, in _read_with_shuffle_row_drop
partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))
ZeroDivisionError: float division by zero
I've logged out the values of num_partitions
and num_rows
, the latter seems to be the suspect which is causing the division by zero error.
I've had a look through the code in py_dict_reader_worker.py
but I'm not particularly familiar with a lot of the petastorm APIs, I'm hoping someone might have seen something similar before which will make it easier to get a fix out.
Versions:
pyarrow==0.15.1
petastorm==0.8.2
Metadata
Metadata
Assignees
Labels
No labels