Skip to content

dask expr - DO NOT MERGE #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: slurm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions charmpandas/dask_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import dask.dataframe as dd
import charmpandas as pd
from dask.dataframe.dask_expr._expr import Expr
from dask.dataframe.dask_expr._groupby import GroupBy
from charmpandas.interface import LocalCluster, CCSInterface
from functools import lru_cache
import copy
cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60)
pd.set_interface(cluster)
def execute_dask(dask_obj, depth=0): # for now replaces dask read parquet with charmpandas read_parquet
# print(simplified_expr)
if not (isinstance(dask_obj, Expr) or isinstance(dask_obj, GroupBy)):
# print(f'Operation - {simplified_expr} not supported in charm pandas')
return dask_obj
else:
if isinstance(dask_obj, GroupBy):
pd_df = execute_dask(dask_obj.obj.expr)
return pd_df.groupby(dask_obj.by)
else:
args = []
if '_funcname' not in dir(dask_obj):
# print(f'Operation - {simplified_expr} not supported')
return dask_obj
try:
args = [execute_dask(o, depth+1) for o in dask_obj.operands]
except Exception as e:
print(f"Error in executing {dask_obj}: {e}")
result = charm_mapper(dask_obj._funcname, args)
# Clear the cache only at the top level (depth=0)
if depth == 0:
read_parquet.cache_clear()
return result

@lru_cache(maxsize=None)
def read_parquet(path, cols, filters):
return pd.read_parquet(path, cols, filters)

def charm_mapper(func_name, args):
# Dataframe operations
if func_name == 'read_parquet':
file = args[0]
cols = tuple(args[1]) if args[1] else tuple([])
filters = tuple(args[2]) if args[2] else tuple([])
return read_parquet(args[0], cols, filters)
elif func_name == 'projection':
return args[0][args[1]]
elif func_name == 'merge':
return args[0].merge(args[1], how=args[2], left_on=args[3], right_on=args[4])
elif func_name == 'groupby': # Difficult to integrate with other expr since groupby is not an expr
return args[0].groupby(args[1])
elif func_name == 'add':
return args[0] + args[1]
elif func_name == 'sub':
return args[0] - args[1]
elif func_name == 'mul':
return args[0] * args[1]
elif func_name == 'div':
return args[0] / args[1]
elif func_name == 'lt':
return args[0] < args[1]
elif func_name == 'le':
return args[0] <= args[1]
elif func_name == 'gt':
return args[0] > args[1]
elif func_name == 'ge':
return args[0] >= args[1]
elif func_name == 'eq':
return args[0] == args[1]
elif func_name == 'ne':
return args[0] != args[1]
elif func_name == 'count':
return args[0].count()
elif func_name == 'sum':
return args[0].sum()
elif func_name == 'assign':
print(args)
if len(args) == 2: # Assign a df
args[0] = args[1]
return args[0]
else: # Assign a column
args[0][args[1]] = args[2]
return args[0]
# Add assignment operations
return None

# New function to handle groupby operations
4 changes: 2 additions & 2 deletions charmpandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ def sum(self):


class DataFrame(object):
def __init__(self, data):
def __init__(self, data, cols=None, filters=None):
interface = get_interface()
self.name = get_table_name()
if isinstance(data, str):
interface.read_parquet(self.name, data)
interface.read_parquet(self.name, data, cols, filters)
elif data == None:
# this is a result of some operation
pass
Expand Down
20 changes: 19 additions & 1 deletion charmpandas/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,32 @@ def get_deletion_header(self):
def mark_deletion(self, table_name):
self.deletion_buffer.append(table_name)

def read_parquet(self, table_name, file_path):
def read_parquet(self, table_name, file_path, columns=None, filters=None):
self.activity_handler()
cmd = self.get_header(self.epoch)

gcmd = self.get_deletion_header()
gcmd += to_bytes(Operations.read, 'i')
gcmd += to_bytes(table_name, 'i')
gcmd += string_bytes(file_path)

# Add columns
if columns:
gcmd += to_bytes(len(columns), 'i')
for col in columns:
gcmd += string_bytes(col)
else:
gcmd += to_bytes(0, 'i') # No columns specified

# Add filters
if filters:
gcmd += to_bytes(len(filters), 'i') # Number of filters
for col, op, value in filters:
gcmd += string_bytes(col) # Column name
gcmd += string_bytes(op) # Operator
gcmd += string_bytes(str(value)) # Value (converted to string)
else:
gcmd += to_bytes(0, 'i') # No filters specified

cmd += to_bytes(len(gcmd), 'i')
cmd += gcmd
Expand Down
4 changes: 2 additions & 2 deletions charmpandas/operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from charmpandas.dataframe import get_interface, get_table_name, DataFrame

def read_parquet(file_path):
return DataFrame(file_path)
def read_parquet(file_path, cols=None, filters=None):
return DataFrame(file_path, cols, filters)

def concat(objs):
if (objs and len(objs) == 0) or objs is None:
Expand Down
53 changes: 45 additions & 8 deletions examples/Demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
"execution_count": 1,
"id": "2da208e1-3efd-4375-a9f9-739407f1d4eb",
"metadata": {},
"outputs": [],
"outputs": [
{
"ename": "ModuleNotFoundError",
"evalue": "No module named 'charmpandas'",
"output_type": "error",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[1;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[1;32mIn[1], line 1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mpd\u001b[39;00m\n\u001b[0;32m 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01minterface\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m LocalCluster, CCSInterface\n",
"\u001b[1;31mModuleNotFoundError\u001b[0m: No module named 'charmpandas'"
]
}
],
"source": [
"import charmpandas as pd\n",
"from charmpandas.interface import SLURMCluster, CCSInterface"
"from charmpandas.interface import LocalCluster, CCSInterface"
]
},
{
Expand All @@ -26,18 +38,27 @@
}
],
"source": [
"cluster = SLURMCluster('mzu-delta-cpu', 'cpu', '/u/bhosale/charmpandas', tasks_per_node=32)\n",
"\n",
"cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60)\n",
"pd.set_interface(cluster)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c4f0e291",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 3,
"id": "7f983ab6-54a4-4ece-a44f-1af66c915715",
"metadata": {},
"outputs": [],
"source": [
"df_ids = pd.read_parquet(\"/u/bhosale/charmpandas/examples/user_ids_small.parquet\")"
"df_ids = pd.read_parquet(\"../user_ids_small.parquet\")"
]
},
{
Expand All @@ -47,7 +68,7 @@
"metadata": {},
"outputs": [],
"source": [
"df_ages = pd.read_parquet(\"/u/bhosale/charmpandas/examples/ages_small.parquet\")"
"df_ages = pd.read_parquet(\"../ages_small.parquet\")"
]
},
{
Expand Down Expand Up @@ -140,12 +161,28 @@
"id": "32b3b987-00f1-4a03-9723-f122f8b44a23",
"metadata": {},
"outputs": [],
"source": []
"source": [
"import dask.dataframe as dd\n",
"from charmpandas.dask_expr import execute_dask\n",
"\n",
"df = dd.read_parquet(\"../user_ids_small.parquet\", ['city'])\n",
"\n",
"df.city = df.city + df.city\n",
"\n",
"df = df.simplify()\n",
"\n",
"# res = df.groupby(['city'])\n",
"\n",
"pd_df = execute_dask(df.expr) \n",
"\n",
"pd_df.get()\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
Expand All @@ -159,7 +196,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.1"
"version": "3.12.4"
}
},
"nbformat": 4,
Expand Down
97 changes: 84 additions & 13 deletions src/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,61 @@ void Partition::operation_read(char* cmd)
int table_name = extract<int>(cmd);
int path_size = extract<int>(cmd);
std::string file_path(cmd, path_size);
if (thisIndex == 0)
cmd += path_size;

// Extract column selection if present
std::vector<std::string> columns;
int num_columns = extract<int>(cmd);
for (int i = 0; i < num_columns; i++) {
int col_size = extract<int>(cmd);
columns.push_back(std::string(cmd, col_size));
cmd += col_size;
}

// Extract filters if present
std::vector<std::tuple<std::string, std::string, std::string>> filters;
int num_filters = extract<int>(cmd);
for (int i = 0; i < num_filters; i++) {
// Extract column name
int col_name_size = extract<int>(cmd);
std::string col_name(cmd, col_name_size);
cmd += col_name_size;

// Extract operator
int op_size = extract<int>(cmd);
std::string op(cmd, op_size);
cmd += op_size;

// Extract value
int value_size = extract<int>(cmd);
std::string value(cmd, value_size);
cmd += value_size;

filters.push_back(std::make_tuple(col_name, op, value));
}

if (thisIndex == 0) {
CkPrintf("[%d] Reading file: %s\n", thisIndex, file_path.c_str());
read_parquet(table_name, file_path);
if (!columns.empty()) {
CkPrintf("[%d] Selected columns:", thisIndex);
for (const auto& col : columns) {
CkPrintf(" %s", col.c_str());
}
CkPrintf("\n");
}
if (!filters.empty()) {
CkPrintf("[%d] Applying filters:", thisIndex);
for (const auto& filter : filters) {
CkPrintf(" %s %s %s;",
std::get<0>(filter).c_str(),
std::get<1>(filter).c_str(),
std::get<2>(filter).c_str());
}
CkPrintf("\n");
}
}

read_parquet(table_name, file_path, columns, filters);
complete_operation();
}

Expand Down Expand Up @@ -710,7 +762,8 @@ arrow::Datum Partition::traverse_ast(char* &msg)
}
}

void Partition::read_parquet(int table_name, std::string file_path)
void Partition::read_parquet(int table_name, std::string file_path, const std::vector<std::string>& columns,
const std::vector<std::tuple<std::string, std::string, std::string>>& filters)
{
std::vector<std::string> files = get_matching_files(file_path);
std::shared_ptr<arrow::io::ReadableFile> input_file;
Expand All @@ -724,18 +777,31 @@ void Partition::read_parquet(int table_name, std::string file_path)
std::string file = files[i];
input_file = arrow::io::ReadableFile::Open(file).ValueOrDie();

// Create a ParquetFileReader instance
// Create a ParquetFileReader instance with options for column selection
std::unique_ptr<parquet::arrow::FileReader> reader;
parquet::arrow::OpenFile(input_file, arrow::default_memory_pool(), &reader);

// Get the file metadata
std::shared_ptr<parquet::FileMetaData> file_metadata = reader->parquet_reader()->metadata();

// Get schema and create column selection
std::shared_ptr<arrow::Schema> schema;
reader->GetSchema(&schema);
std::vector<int> column_indices;

// Convert column names to indices if columns are specified
if (!columns.empty()) {
for (const auto& col : columns) {
int idx = schema->GetFieldIndex(col);
if (idx != -1) {
column_indices.push_back(idx);
} else {
CkPrintf("Warning: Column '%s' not found in parquet file\n", col.c_str());
}
}
}

int num_rows = file_metadata->num_rows();

//if (thisIndex == 0)
// CkPrintf("[%d] Reading %i rows from %s\n", thisIndex, num_rows, file.c_str());

int nrows_per_partition = num_rows / num_partitions;
int start_row = nrows_per_partition * thisIndex;
int nextra_rows = num_rows - num_partitions * nrows_per_partition;
Expand Down Expand Up @@ -772,9 +838,17 @@ void Partition::read_parquet(int table_name, std::string file_path)
// Calculate how many rows to read from this row group
int64_t rows_in_group = std::min(rows_to_read, row_group_num_rows - start_row);

// Read the rows
// Read the rows with column selection
TablePtr table;
reader->ReadRowGroup(i, &table);
if (column_indices.empty()) {
reader->ReadRowGroup(i, &table);
} else {
reader->ReadRowGroup(i, column_indices, &table);
}

// Note: We're skipping the dataset-based filtering for now since it caused errors
// Filter implementation would need the arrow::dataset module properly included

TablePtr sliced_table = table->Slice(start_row, rows_in_group);
row_tables.push_back(sliced_table);

Expand Down Expand Up @@ -808,9 +882,6 @@ void Partition::read_parquet(int table_name, std::string file_path)
read_tables->num_rows()).ValueOrDie();
tables[table_name] = set_column(read_tables, "home_partition", arrow::Datum(
arrow::ChunkedArray::Make({home_partition_array}).ValueOrDie()))->CombineChunks().ValueOrDie();


//CkPrintf("[%d] Read number of rows = %i\n", thisIndex, combined->num_rows());
}

Aggregator::Aggregator(CProxy_Main main_proxy_)
Expand Down
Loading