-
Notifications
You must be signed in to change notification settings - Fork 111
SeaDatabricksClient: Add Metadata Commands #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: sea-migration
Are you sure you want to change the base?
Changes from all commits
138c2ae
3e3ab94
4a78165
0dac4aa
1b794c7
da5a6fe
686ade4
31e6c83
69ea238
66d7517
71feef9
ae9862f
d8aa69e
db139bc
b977b12
da615c0
0da04a6
ea9d456
8985c62
d9bcdbe
ee9fa1c
24c6152
67fd101
271fcaf
bf26ea3
ed7cf91
dae15e3
db5bbea
d5d3699
6137a3d
75b0773
4494dcd
4d0aeca
7cece5e
8977c06
0216d7a
4cb15fd
dee47f7
e385d5b
484064e
030edf8
30f8266
033ae73
33821f4
3e22c6c
787f1f7
165c4f3
a6e40d0
52e3088
641c09b
8bd12d8
ffded6e
227f6b3
68657a3
3940eec
37813ba
267c9f4
2967119
47fd60d
982fdf2
9e14d48
be1997e
e8e8ee7
05ee4e7
3ffa898
2952d8d
89e2aa0
cbace3f
c075b07
c62f76d
199402e
8ac574b
398ca70
b1acc5b
ef2a7ee
699942d
af8f74e
5540c5c
efe3881
36ab59b
1d57c99
df6dac2
ad0e527
ed446a0
38e4b5c
94879c0
1809956
da5260c
0385ffb
349c021
6229848
fd52356
64e58b0
0a2cdfd
90bb09c
cd22389
82e0f8b
e64b81b
5ab9bbe
1ab6e87
f469c24
68ec65f
ffd478e
f6d873d
28675f5
3578659
8713023
22dc252
390f592
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
""" | ||
Client-side filtering utilities for Databricks SQL connector. | ||
|
||
This module provides filtering capabilities for result sets returned by different backends. | ||
""" | ||
|
||
import logging | ||
from typing import ( | ||
List, | ||
Optional, | ||
Any, | ||
Callable, | ||
cast, | ||
) | ||
|
||
from databricks.sql.backend.sea.backend import SeaDatabricksClient | ||
from databricks.sql.backend.types import ExecuteResponse | ||
|
||
from databricks.sql.result_set import ResultSet, SeaResultSet | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ResultSetFilter: | ||
""" | ||
A general-purpose filter for result sets. | ||
""" | ||
|
||
@staticmethod | ||
def _filter_sea_result_set( | ||
result_set: SeaResultSet, filter_func: Callable[[List[Any]], bool] | ||
) -> SeaResultSet: | ||
""" | ||
Filter a SEA result set using the provided filter function. | ||
|
||
Args: | ||
result_set: The SEA result set to filter | ||
filter_func: Function that takes a row and returns True if the row should be included | ||
|
||
Returns: | ||
A filtered SEA result set | ||
""" | ||
|
||
# Get all remaining rows | ||
all_rows = result_set.results.remaining_rows() | ||
|
||
# Filter rows | ||
filtered_rows = [row for row in all_rows if filter_func(row)] | ||
|
||
# Reuse the command_id from the original result set | ||
command_id = result_set.command_id | ||
|
||
# Create an ExecuteResponse with the filtered data | ||
execute_response = ExecuteResponse( | ||
command_id=command_id, | ||
status=result_set.status, | ||
description=result_set.description, | ||
has_been_closed_server_side=result_set.has_been_closed_server_side, | ||
lz4_compressed=result_set.lz4_compressed, | ||
arrow_schema_bytes=result_set._arrow_schema_bytes, | ||
is_staging_operation=False, | ||
) | ||
|
||
# Create a new ResultData object with filtered data | ||
|
||
from databricks.sql.backend.sea.models.base import ResultData | ||
|
||
result_data = ResultData(data=filtered_rows, external_links=None) | ||
|
||
from databricks.sql.result_set import SeaResultSet | ||
|
||
# Create a new SeaResultSet with the filtered data | ||
filtered_result_set = SeaResultSet( | ||
connection=result_set.connection, | ||
execute_response=execute_response, | ||
sea_client=cast(SeaDatabricksClient, result_set.backend), | ||
buffer_size_bytes=result_set.buffer_size_bytes, | ||
arraysize=result_set.arraysize, | ||
result_data=result_data, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you remind me what is the significance of this result_data param in result set? is this present in the base class? Is this an optional param and is used to create a result set with hard-coded rows? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not present in the base class, it is an instance of a |
||
) | ||
|
||
return filtered_result_set | ||
|
||
@staticmethod | ||
def filter_by_column_values( | ||
result_set: ResultSet, | ||
column_index: int, | ||
allowed_values: List[str], | ||
case_sensitive: bool = False, | ||
) -> ResultSet: | ||
""" | ||
Filter a result set by values in a specific column. | ||
|
||
Args: | ||
result_set: The result set to filter | ||
column_index: The index of the column to filter on | ||
allowed_values: List of allowed values for the column | ||
case_sensitive: Whether to perform case-sensitive comparison | ||
|
||
Returns: | ||
A filtered result set | ||
""" | ||
|
||
# Convert to uppercase for case-insensitive comparison if needed | ||
if not case_sensitive: | ||
allowed_values = [v.upper() for v in allowed_values] | ||
|
||
# Determine the type of result set and apply appropriate filtering | ||
from databricks.sql.result_set import SeaResultSet | ||
|
||
if isinstance(result_set, SeaResultSet): | ||
return ResultSetFilter._filter_sea_result_set( | ||
result_set, | ||
lambda row: ( | ||
len(row) > column_index | ||
and isinstance(row[column_index], str) | ||
and ( | ||
row[column_index].upper() | ||
if not case_sensitive | ||
else row[column_index] | ||
) | ||
in allowed_values | ||
), | ||
) | ||
|
||
# For other result set types, return the original (should be handled by specific implementations) | ||
logger.warning( | ||
f"Filtering not implemented for result set type: {type(result_set).__name__}" | ||
) | ||
return result_set | ||
|
||
@staticmethod | ||
def filter_tables_by_type( | ||
result_set: ResultSet, table_types: Optional[List[str]] = None | ||
) -> ResultSet: | ||
""" | ||
Filter a result set of tables by the specified table types. | ||
|
||
This is a client-side filter that processes the result set after it has been | ||
retrieved from the server. It filters out tables whose type does not match | ||
any of the types in the table_types list. | ||
|
||
Args: | ||
result_set: The original result set containing tables | ||
table_types: List of table types to include (e.g., ["TABLE", "VIEW"]) | ||
|
||
Returns: | ||
A filtered result set containing only tables of the specified types | ||
""" | ||
|
||
# Default table types if none specified | ||
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"] | ||
valid_types = ( | ||
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES | ||
) | ||
|
||
# Table type is the 6th column (index 5) | ||
return ResultSetFilter.filter_by_column_values( | ||
result_set, 5, valid_types, case_sensitive=True | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is specific to SEA result set and can't be used for a generic result set class? let's try to make it generic for a result set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some service specific methods at some point during the filtering process to know what kind of result set to return, since our concrete instances are service specific. I tried to keep the root methods invoked (
filter by table type
) general, following which they invoke the service specific builders based on the type of the instance passed to them.