diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py new file mode 100644 index 0000000..2fa8c74 --- /dev/null +++ b/charmpandas/dask_expr.py @@ -0,0 +1,88 @@ +import dask.dataframe as dd +import charmpandas as pd +from dask.dataframe.dask_expr._expr import Expr +from dask.dataframe.dask_expr._groupby import GroupBy +from charmpandas.interface import LocalCluster, CCSInterface +from functools import lru_cache +import copy + +# modify this +cluster = LocalCluster(charmpandas_home='/mnt/c/Users/mohan/PPL/charmpandas', local_port=1239, min_pes=1, max_pes=1, odf=1,activity_timeout=60) +pd.set_interface(cluster) +def execute_dask(dask_obj, depth=0): # for now replaces dask read parquet with charmpandas read_parquet + # print(simplified_expr) + if not (isinstance(dask_obj, Expr) or isinstance(dask_obj, GroupBy)): + # print(f'Operation - {simplified_expr} not supported in charm pandas') + return dask_obj + else: + if isinstance(dask_obj, GroupBy): + pd_df = execute_dask(dask_obj.obj.expr) + return pd_df.groupby(dask_obj.by) + else: + args = [] + if '_funcname' not in dir(dask_obj): + # print(f'Operation - {simplified_expr} not supported') + return dask_obj + try: + args = [execute_dask(o, depth+1) for o in dask_obj.operands] + except Exception as e: + print(f"Error in executing {dask_obj}: {e}") + result = charm_mapper(dask_obj._funcname, args) + # Clear the cache only at the top level (depth=0) + if depth == 0: + read_parquet.cache_clear() + return result + +@lru_cache(maxsize=None) +def read_parquet(path, cols, filters): + return pd.read_parquet(path, cols, filters) + +def charm_mapper(func_name, args): + # Dataframe operations + if func_name == 'read_parquet': + file = args[0] + cols = tuple(args[1]) if args[1] else tuple([]) + filters = tuple(args[2]) if args[2] else tuple([]) + return read_parquet(args[0], cols, filters) + elif func_name == 'projection': + return args[0][args[1]] + elif func_name == 'merge': + return args[0].merge(args[1], how=args[2], left_on=args[3], right_on=args[4]) + elif func_name == 'groupby': # Difficult to integrate with other expr since groupby is not an expr + return args[0].groupby(args[1]) + elif func_name == 'add': + return args[0] + args[1] + elif func_name == 'sub': + return args[0] - args[1] + elif func_name == 'mul': + return args[0] * args[1] + elif func_name == 'div': + return args[0] / args[1] + elif func_name == 'lt': + return args[0] < args[1] + elif func_name == 'le': + return args[0] <= args[1] + elif func_name == 'gt': + return args[0] > args[1] + elif func_name == 'ge': + return args[0] >= args[1] + elif func_name == 'eq': + return args[0] == args[1] + elif func_name == 'ne': + return args[0] != args[1] + elif func_name == 'count': + return args[0].count() + elif func_name == 'sum': + return args[0].sum() + elif func_name == 'assign': + print(args) + if len(args) == 2: # Assign a df + args[0] = args[1] + return args[0] + else: # Assign a column + args[0][args[1]] = args[2] + return args[0] + # Add assignment operations + return None + +# New function to handle groupby operations diff --git a/charmpandas/dataframe.py b/charmpandas/dataframe.py index 0d680c3..3b96618 100644 --- a/charmpandas/dataframe.py +++ b/charmpandas/dataframe.py @@ -151,11 +151,11 @@ def sum(self): class DataFrame(object): - def __init__(self, data): + def __init__(self, data, cols=None, filters=None): interface = get_interface() self.name = get_table_name() if isinstance(data, str): - interface.read_parquet(self.name, data) + interface.read_parquet(self.name, data, cols, filters) elif data == None: # this is a result of some operation pass diff --git a/charmpandas/interface.py b/charmpandas/interface.py index 3a66cfb..ae39cd7 100644 --- a/charmpandas/interface.py +++ b/charmpandas/interface.py @@ -183,7 +183,7 @@ def get_deletion_header(self): def mark_deletion(self, table_name): self.deletion_buffer.append(table_name) - def read_parquet(self, table_name, file_path): + def read_parquet(self, table_name, file_path, columns=None, filters=None): self.activity_handler() cmd = self.get_header(self.epoch) @@ -191,6 +191,24 @@ def read_parquet(self, table_name, file_path): gcmd += to_bytes(Operations.read, 'i') gcmd += to_bytes(table_name, 'i') gcmd += string_bytes(file_path) + + # Add columns + if columns: + gcmd += to_bytes(len(columns), 'i') + for col in columns: + gcmd += string_bytes(col) + else: + gcmd += to_bytes(0, 'i') # No columns specified + + # Add filters + if filters: + gcmd += to_bytes(len(filters), 'i') # Number of filters + for col, op, value in filters: + gcmd += string_bytes(col) # Column name + gcmd += string_bytes(op) # Operator + gcmd += string_bytes(str(value)) # Value (converted to string) + else: + gcmd += to_bytes(0, 'i') # No filters specified cmd += to_bytes(len(gcmd), 'i') cmd += gcmd @@ -406,13 +424,15 @@ def reset_timer(self): class LocalCluster(CCSInterface): - def __init__(self, min_pes=1, max_pes=1, odf=4, activity_timeout=60): + def __init__(self, charmpandas_home, local_port=1234, min_pes=1, max_pes=1, odf=4, activity_timeout=60): self.min_pes = min_pes self.max_pes = max_pes + self.local_port = local_port + self.charmpandas_home = charmpandas_home self.logfile = open("server.log", "w") self._write_nodelist(max_pes) self._run_server() - super().__init__("127.0.0.1", 1234, odf=odf, activity_timeout=activity_timeout) + super().__init__("127.0.0.1", local_port, odf=odf, activity_timeout=activity_timeout) def _write_nodelist(self, num_pes): nodestr = "host localhost\n" * num_pes @@ -430,10 +450,13 @@ def activity_handler(self): self.current_pes = self.max_pes def _run_server(self): - self.process = subprocess.Popen(['/home/adityapb1546/charm/charmpandas/src/charmrun +p%i ' - '/home/adityapb1546/charm/charmpandas/src/server.out +balancer MetisLB +LBDebug 3' - ' ++server ++server-port 1234 ++nodelist ./localnodelist' % self.max_pes], - shell=True, text=True, stdout=self.logfile, stderr=subprocess.STDOUT) + charmrun_path = f"{self.charmpandas_home}/src/charmrun" + serverout_path = f"{self.charmpandas_home}/src/server.out" + self.process = subprocess.Popen([ + f'{charmrun_path} +p{self.max_pes} ' + f'{serverout_path} +balancer MetisLB +LBDebug 3 ++server ++local ++server-port {self.local_port} ++nodelist ./localnodelist' + ], + shell=True, text=True, stdout=self.logfile, stderr=subprocess.STDOUT) time.sleep(5) self.current_pes = self.max_pes diff --git a/charmpandas/operations.py b/charmpandas/operations.py index 9c97445..4492aac 100644 --- a/charmpandas/operations.py +++ b/charmpandas/operations.py @@ -1,7 +1,7 @@ from charmpandas.dataframe import get_interface, get_table_name, DataFrame -def read_parquet(file_path): - return DataFrame(file_path) +def read_parquet(file_path, cols=None, filters=None): + return DataFrame(file_path, cols, filters) def concat(objs): if (objs and len(objs) == 0) or objs is None: diff --git a/examples/Demo.ipynb b/examples/Demo.ipynb index 830a904..1210b65 100644 --- a/examples/Demo.ipynb +++ b/examples/Demo.ipynb @@ -5,10 +5,22 @@ "execution_count": 1, "id": "2da208e1-3efd-4375-a9f9-739407f1d4eb", "metadata": {}, - "outputs": [], + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'charmpandas'", + "output_type": "error", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[1;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[1], line 1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mpd\u001b[39;00m\n\u001b[0;32m 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01minterface\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m LocalCluster, CCSInterface\n", + "\u001b[1;31mModuleNotFoundError\u001b[0m: No module named 'charmpandas'" + ] + } + ], "source": [ "import charmpandas as pd\n", - "from charmpandas.interface import SLURMCluster, CCSInterface" + "from charmpandas.interface import LocalCluster, CCSInterface" ] }, { @@ -26,10 +38,19 @@ } ], "source": [ - "cluster = SLURMCluster('mzu-delta-cpu', 'cpu', '/u/bhosale/charmpandas', tasks_per_node=32)\n", + "\n", + "cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60)\n", "pd.set_interface(cluster)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4f0e291", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": 3, @@ -37,7 +58,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_ids = pd.read_parquet(\"/u/bhosale/charmpandas/examples/user_ids_small.parquet\")" + "df_ids = pd.read_parquet(\"../user_ids_small.parquet\")" ] }, { @@ -47,7 +68,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_ages = pd.read_parquet(\"/u/bhosale/charmpandas/examples/ages_small.parquet\")" + "df_ages = pd.read_parquet(\"../ages_small.parquet\")" ] }, { @@ -140,12 +161,28 @@ "id": "32b3b987-00f1-4a03-9723-f122f8b44a23", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "import dask.dataframe as dd\n", + "from charmpandas.dask_expr import execute_dask\n", + "\n", + "df = dd.read_parquet(\"../user_ids_small.parquet\", ['city'])\n", + "\n", + "df.city = df.city + df.city\n", + "\n", + "df = df.simplify()\n", + "\n", + "# res = df.groupby(['city'])\n", + "\n", + "pd_df = execute_dask(df.expr) \n", + "\n", + "pd_df.get()\n", + "\n" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -159,7 +196,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.1" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/src/Makefile b/src/Makefile index 76b6856..63d448d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -37,7 +37,7 @@ partition.o: partition.cpp partition.decl.h partition.def.h reduction.decl.h red $(CHARMC) `pkg-config --cflags --libs arrow` -module CommonLBs -c $< $(OPTS) server.out: partition.o server.o - $(CHARMC) -Wl,-rpath,/u/bhosale/.conda/envs/charmpandas/lib `pkg-config --cflags --libs arrow` -L$(XXHASH_DIR)/lib -g -lxxhash -lparquet -larrow_acero -language charm++ -module CommonLBs -o $@ partition.o server.o + $(CHARMC) -Wl,-rpath,/u/bhosale/.conda/envs/charmpandas/lib `pkg-config --cflags --libs arrow` -L$(XXHASH_DIR)/lib -g -lxxhash -lparquet -larrow_dataset -larrow_acero -language charm++ -module CommonLBs -o $@ partition.o server.o run-server: server.out ./charmrun +p4 ./server.out ++server ++server-port 10000 diff --git a/src/partition.cpp b/src/partition.cpp index 784bcd9..bf2214d 100644 --- a/src/partition.cpp +++ b/src/partition.cpp @@ -329,9 +329,64 @@ void Partition::operation_read(char* cmd) int table_name = extract(cmd); int path_size = extract(cmd); std::string file_path(cmd, path_size); - if (thisIndex == 0) + // file_path.reserve(path_size); + // for (int i = 0; i < path_size; i++) { + // file_path.push_back(cmd[i]); + // } + cmd += path_size; + // Extract column selection if present + std::vector columns; + int num_columns = extract(cmd); + for (int i = 0; i < num_columns; i++) { + int col_size = extract(cmd); + columns.push_back(std::string(cmd, col_size)); + cmd += col_size; + } + + // Extract filters if present + std::vector> filters; + int num_filters = extract(cmd); + for (int i = 0; i < num_filters; i++) { + // Extract column name + int col_name_size = extract(cmd); + std::string col_name(cmd, col_name_size); + cmd += col_name_size; + + // Extract operator + int op_size = extract(cmd); + std::string op(cmd, op_size); + cmd += op_size; + + // Extract value + int value_size = extract(cmd); + std::string value(cmd, value_size); + cmd += value_size; + + filters.push_back(std::make_tuple(col_name, op, value)); + } + + // if (thisIndex == 0) { CkPrintf("[%d] Reading file: %s\n", thisIndex, file_path.c_str()); - read_parquet(table_name, file_path); + if (!columns.empty()) { + CkPrintf("[%d] Selected columns:", thisIndex); + for (const auto& col : columns) { + CkPrintf(" %s", col.c_str()); + } + CkPrintf("\n"); + } + if (!filters.empty()) { + CkPrintf("[%d] Applying filters:", thisIndex); + for (const auto& filter : filters) { + CkPrintf(" %s %s %s;", + std::get<0>(filter).c_str(), + std::get<1>(filter).c_str(), + std::get<2>(filter).c_str()); + } + CkPrintf("\n"); + } + // } + + read_parquet(table_name, file_path, columns, filters); complete_operation(); } @@ -710,32 +765,102 @@ arrow::Datum Partition::traverse_ast(char* &msg) } } -void Partition::read_parquet(int table_name, std::string file_path) +void Partition::read_parquet(int table_name, std::string file_path, const std::vector& columns, + const std::vector>& filters) { std::vector files = get_matching_files(file_path); std::shared_ptr input_file; TablePtr read_tables = nullptr; - if (thisIndex == 0) - CkPrintf("[%d] Reading %i files\n", thisIndex, files.size()); + // if (thisIndex == 0) + // CkPrintf("[%d] Reading %i files\n", thisIndex, files.size()); + + // Construct filter expression once + arrow::compute::Expression filter_expr = arrow::compute::literal(true); + if (!filters.empty()) { + for (const auto& filter : filters) { + const std::string& col_name = std::get<0>(filter); + const std::string& op = std::get<1>(filter); + const auto& value = std::get<2>(filter); + + auto field_ref = arrow::compute::field_ref(col_name); + arrow::compute::Expression expr; + + // Try to parse value as int or float, otherwise use as string + arrow::compute::Expression value_expr; + try { + size_t idx; + int64_t ival = std::stoll(value, &idx); + if (idx == value.size()) { + value_expr = arrow::compute::literal(ival); + } else { + throw std::invalid_argument(""); + } + } catch (...) { + try { + size_t idx; + double dval = std::stod(value, &idx); + if (idx == value.size()) { + value_expr = arrow::compute::literal(dval); + } else { + throw std::invalid_argument(""); + } + } catch (...) { + value_expr = arrow::compute::literal(value); + } + } + + if (op == "==") { + expr = arrow::compute::equal(field_ref, value_expr); + } else if (op == "!=") { + expr = arrow::compute::not_equal(field_ref, value_expr); + } else if (op == ">") { + expr = arrow::compute::greater(field_ref, value_expr); + } else if (op == ">=") { + expr = arrow::compute::greater_equal(field_ref, value_expr); + } else if (op == "<") { + expr = arrow::compute::less(field_ref, value_expr); + } else if (op == "<=") { + expr = arrow::compute::less_equal(field_ref, value_expr); + } + + filter_expr = arrow::compute::and_(filter_expr, expr); + } + } + + // CkPrintf("[DEBUG] filter_expr: %s\n", filter_expr.ToString().c_str()); for (int i = 0; i < files.size(); i++) { std::string file = files[i]; + // CkPrintf("Reading file-%s\n", file.c_str()); input_file = arrow::io::ReadableFile::Open(file).ValueOrDie(); - // Create a ParquetFileReader instance + // Create a ParquetFileReader instance with options for column selection std::unique_ptr reader; parquet::arrow::OpenFile(input_file, arrow::default_memory_pool(), &reader); // Get the file metadata std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); + // Get schema and create column selection + std::shared_ptr schema; + reader->GetSchema(&schema); + std::vector column_indices; + + // Convert column names to indices if columns are specified + if (!columns.empty()) { + for (const auto& col : columns) { + int idx = schema->GetFieldIndex(col); + if (idx != -1) { + column_indices.push_back(idx); + } else { + CkPrintf("Warning: Column '%s' not found in parquet file\n", col.c_str()); + } + } + } + int num_rows = file_metadata->num_rows(); - - //if (thisIndex == 0) - // CkPrintf("[%d] Reading %i rows from %s\n", thisIndex, num_rows, file.c_str()); - int nrows_per_partition = num_rows / num_partitions; int start_row = nrows_per_partition * thisIndex; int nextra_rows = num_rows - num_partitions * nrows_per_partition; @@ -772,9 +897,39 @@ void Partition::read_parquet(int table_name, std::string file_path) // Calculate how many rows to read from this row group int64_t rows_in_group = std::min(rows_to_read, row_group_num_rows - start_row); - // Read the rows + // Read the rows with column selection TablePtr table; - reader->ReadRowGroup(i, &table); + if (column_indices.empty()) { + reader->ReadRowGroup(i, &table); + } else { + reader->ReadRowGroup(i, column_indices, &table); + } + + // Apply filters if any exist + if (table && !filters.empty()) { + std::shared_ptr dataset = + std::make_shared(table); + + // 2: Build ScannerOptions for a Scanner to do a basic filter operation + auto maybe_builder = dataset->NewScan(); + auto builder = maybe_builder.ValueOrDie(); + builder->Filter(filter_expr); + auto maybe_scanner = builder->Finish(); + auto scanner = maybe_scanner.ValueOrDie(); + + // 4: Perform the Scan and make a Table with the result + // CkPrintf("[DEBUG] Before scanner->ToTable() on partition %d, row group %d\n", thisIndex, i); + auto result = scanner->ToTable(); + if (!result.ok()) { + // CkPrintf("[ERROR] scanner->ToTable() failed on partition %d, row group %d: %s\n", thisIndex, i, result.status().ToString().c_str()); + } else { + // CkPrintf("[DEBUG] scanner->ToTable() succeeded on partition %d, row group %d\n", thisIndex, i); + table = result.ValueOrDie(); + // CkPrintf("[DEBUG] Filtered table has %lld rows and %d columns\n", (long long)table->num_rows(), table->num_columns()); + } + } + + TablePtr sliced_table = table->Slice(start_row, rows_in_group); row_tables.push_back(sliced_table); @@ -790,6 +945,8 @@ void Partition::read_parquet(int table_name, std::string file_path) read_tables = combined; else read_tables = arrow::ConcatenateTables({read_tables, combined}).ValueOrDie(); + + // CkPrintf("Read file-%s\n", file.c_str()); } arrow::Int32Builder builder; @@ -808,9 +965,7 @@ void Partition::read_parquet(int table_name, std::string file_path) read_tables->num_rows()).ValueOrDie(); tables[table_name] = set_column(read_tables, "home_partition", arrow::Datum( arrow::ChunkedArray::Make({home_partition_array}).ValueOrDie()))->CombineChunks().ValueOrDie(); - - - //CkPrintf("[%d] Read number of rows = %i\n", thisIndex, combined->num_rows()); + CkPrintf("Reading complete"); } Aggregator::Aggregator(CProxy_Main main_proxy_) diff --git a/src/partition.hpp b/src/partition.hpp index 4da7074..fb304a1 100644 --- a/src/partition.hpp +++ b/src/partition.hpp @@ -15,6 +15,7 @@ #include "arrow/acero/exec_plan.h" #include #include "arrow/compute/expression.h" +#include "arrow/dataset/file_ipc.h" #include "types.hpp" #include "partition.decl.h" @@ -284,7 +285,9 @@ class Partition : public CBase_Partition arrow::Datum traverse_ast(char* &msg); - void read_parquet(int table_name, std::string file_path); + void read_parquet(int table_name, std::string file_path, + const std::vector& columns = std::vector(), + const std::vector>& filters = std::vector>()); template void reduce_scalar(ScalarPtr& scalar, AggregateOperation& op); diff --git a/src/server.hpp b/src/server.hpp index 0d99795..e282530 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -41,7 +41,7 @@ class Server } memcpy(&bitmap[CkNumPes()], &new_procs, sizeof(int)); bitmap[CkNumPes()+sizeof(int)] = '\0'; - rescale(pass_msg); + // rescale(pass_msg); CkPrintf("Rescale epoch = %i\n", epoch); //main_proxy.ckLocal()->creation_reply = CcsDelayReply(); partition_ptr->receive_command(epoch, size, cmd);