33from typing import TYPE_CHECKING , Iterator , List , Optional , Union
44
55import pyarrow as pa
6- from odp .client .tabular_v2 .big import big
6+ from odp .client .tabular_v2 .big import big , convert_schema_inward
77from odp .client .tabular_v2 .big .remote import RemoteBigCol
88from odp .client .tabular_v2 .bsquare import bsquare
99
@@ -41,7 +41,8 @@ def _fetch_schema(self):
4141 assert len (empty ) == 1
4242 assert empty [0 ].num_rows == 0
4343 self ._inner_schema = empty [0 ].schema
44- self ._outer_schema = bsquare .convert_schema_outward (self ._inner_schema )
44+ mid = big .convert_schema_outward (self ._inner_schema )
45+ self ._outer_schema = bsquare .convert_schema_outward (mid )
4546
4647 def drop (self ):
4748 try :
@@ -58,7 +59,8 @@ def drop(self):
5859
5960 def create (self , schema : pa .Schema ):
6061 self ._outer_schema = schema
61- self ._inner_schema = bsquare .convert_schema_inward (schema )
62+ schema = bsquare .convert_schema_inward (schema )
63+ self ._inner_schema = convert_schema_inward (schema )
6264 buf = io .BytesIO ()
6365 w = pa .ipc .RecordBatchStreamWriter (buf , self ._inner_schema )
6466 w .write_batch (pa .RecordBatch .from_pylist ([], schema = self ._inner_schema ))
@@ -131,9 +133,9 @@ def scanner(scanner_cursor: str) -> Iterator[pa.RecordBatch]:
131133 tab = pa .Table .from_batches ([b ], schema = b .schema )
132134 if e is not None : # drop false positives
133135 before = tab .num_rows
134- logging .debug ("filtering before %d rows..." , tab .num_rows )
135136 tab = tab .filter (e )
136- logging .info ("filtered %d rows to %d" , before , tab .num_rows )
137+ if tab .num_rows < before :
138+ logging .debug ("filtered %d rows to %d" , before , tab .num_rows )
137139 if cols :
138140 tab = tab .select (cols ) # drop the cols used for filtering
139141 for x in tab .to_batches ():
@@ -188,10 +190,6 @@ def __exit__(self, exc_type, exc_val, exc_tb):
188190 def schema (self ) -> pa .Schema :
189191 return self ._outer_schema
190192
191- @property
192- def name (self ) -> str :
193- return self ._id
194-
195193 # used as a filter in Cursor, encode in tx
196194 def _decode (self , b : pa .RecordBatch ) -> pa .RecordBatch :
197195 b = self ._bigcol .decode (b ) # convert to panda first, then do the magic
0 commit comments