Skip to content
This repository was archived by the owner on Apr 2, 2025. It is now read-only.

Commit 347fcc8

Browse files
authored
Merge pull request #109 from C4IROcean/feature/tabular-v2
Integrate kTable with sdk
2 parents 6583910 + c22d8da commit 347fcc8

25 files changed

+1943
-3
lines changed

poetry.lock

Lines changed: 74 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ python = "^3.9"
1111
odp-dto = { path = "./src/dto", develop = true }
1212
odp-sdk = { path = "./src/sdk", develop = true }
1313
jupyter = "^1.0.0"
14+
pyarrow = "^18.1.0"
1415

1516
[tool.poetry.group.dev.dependencies]
1617
python-dotenv = "^1.0.1"

src/sdk/odp/client/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33

44
from pydantic import BaseModel, Field, PrivateAttr
55

6+
from ..dto import DatasetDto
67
from .auth import TokenProvider, get_default_token_provider
78
from .http_client import OdpHttpClient
89
from .raw_storage_client import OdpRawStorageClient
910
from .resource_client import OdpResourceClient
1011
from .tabular_storage_client import OdpTabularStorageClient
12+
from .tabular_storage_v2_client import ClientAuthorization
13+
from .tabular_v2.client import TableHandler
1114

1215

1316
class OdpClient(BaseModel):
@@ -28,6 +31,9 @@ def __init__(self, **data):
2831
self._catalog_client = OdpResourceClient(http_client=self._http_client, resource_endpoint="/catalog")
2932
self._raw_storage_client = OdpRawStorageClient(http_client=self._http_client)
3033
self._tabular_storage_client = OdpTabularStorageClient(http_client=self._http_client)
34+
self._tabular_storage_v2_client = ClientAuthorization(
35+
base_url=self.base_url, token_provider=self.token_provider
36+
)
3137

3238
def personalize_name(self, name: str, fmt: Optional[str] = None) -> str:
3339
"""Personalize a name by adding a postfix unique to the user
@@ -78,3 +84,6 @@ def raw(self) -> OdpRawStorageClient:
7884
@property
7985
def tabular(self) -> OdpTabularStorageClient:
8086
return self._tabular_storage_client
87+
88+
def table_v2(self, dataset_dto: DatasetDto) -> TableHandler:
89+
return self._tabular_storage_v2_client.table(str(dataset_dto.uuid))
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import Dict, Optional, Union
2+
3+
from odp.client.auth import TokenProvider
4+
from odp.client.tabular_v2.client import Client
5+
6+
7+
class ClientAuthorization(Client):
8+
def __init__(self, base_url, token_provider: TokenProvider):
9+
if base_url.endswith(":8888"):
10+
base_url = base_url.replace(":8888", ":31337")
11+
super().__init__(base_url)
12+
self.token_provider = token_provider
13+
14+
def _request(
15+
self,
16+
path: str,
17+
data: Union[Dict, bytes, None] = None,
18+
params: Optional[Dict] = None,
19+
headers: Optional[Dict] = None,
20+
) -> Client.Response:
21+
headers = headers or {}
22+
headers["Authorization"] = self.token_provider.get_token()
23+
return super()._request(path, data, params, headers)

src/sdk/odp/client/tabular_v2/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .big import BigCol, inner_exp
2+
from .buffer import Buffer
3+
from .local import LocalBigCol
4+
from .remote import RemoteBigCol
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import json
2+
import logging
3+
from abc import abstractmethod
4+
from typing import Iterable, Optional
5+
6+
import pyarrow as pa
7+
from odp.client.tabular_v2.util.exp import BinOp, Field, Op, Parens, Scalar, UnaryOp
8+
9+
STR_LIMIT = 128 # when to start using a reference
10+
STR_MIN = 12 # what to keep as prefix in the reference
11+
MAX_BIGFILE_SIZE = 64 * 1024 * 1024 # max size of a big file
12+
13+
14+
class BigCol:
15+
def __init__(self):
16+
pass
17+
18+
@abstractmethod
19+
def fetch(self, md5: str) -> bytes:
20+
"""fetch data, called often, should cache"""
21+
raise NotImplementedError()
22+
23+
@abstractmethod
24+
def upload(self, md5: str, data: Iterable[bytes]):
25+
"""upload data"""
26+
raise NotImplementedError()
27+
28+
def decode(self, batch: pa.RecordBatch) -> pa.RecordBatch:
29+
cache = {}
30+
31+
def decode(x):
32+
if x is None:
33+
return None
34+
if isinstance(x, str):
35+
prefix, ref = x.rsplit("~", 1)
36+
else:
37+
prefix, ref = x.rsplit(b"~", 1)
38+
ref = ref.decode("utf-8")
39+
if ref == "":
40+
return prefix
41+
big_id, start, size = ref.split(":")
42+
start = int(start)
43+
size = int(size)
44+
if big_id in cache:
45+
data = cache[big_id]
46+
else:
47+
data = self.fetch(big_id)
48+
cache[big_id] = data
49+
if isinstance(x, str):
50+
return data[start : start + size].decode("utf-8")
51+
else:
52+
return data[start : start + size]
53+
54+
fields = _fields_from_schema(batch.schema)
55+
df = batch.to_pandas()
56+
df[fields] = df[fields].map(decode)
57+
return pa.RecordBatch.from_pandas(df, schema=batch.schema)
58+
59+
60+
def _fields_from_schema(schema: pa.Schema) -> list[str]:
61+
fields = []
62+
for name in schema.names:
63+
field: pa.Field = schema.field(name)
64+
if field.type == pa.string():
65+
fields.append(name)
66+
if field.type == pa.binary():
67+
fields.append(name)
68+
return fields
69+
70+
71+
def inner_exp(schema: pa.Schema, op: Optional[Op]) -> Optional[Op]:
72+
if op is None:
73+
return None
74+
75+
fields = _fields_from_schema(schema)
76+
77+
# TODO don't use the visitor, instead parse manually and use negation context
78+
def visitor(neg: bool, op: Op) -> Op:
79+
if isinstance(op, Field):
80+
return op
81+
if isinstance(op, Scalar):
82+
return op
83+
if isinstance(op, Parens):
84+
op.exp = visitor(neg, op.exp)
85+
return op
86+
if isinstance(op, UnaryOp):
87+
if op.prefix in ["~", "not", "!", "invert"]:
88+
return UnaryOp(prefix=op.prefix, exp=visitor(~neg, op.exp), suffix=op.suffix)
89+
return op
90+
if isinstance(op, BinOp):
91+
op = BinOp(left=visitor(neg, op.left), op=op.op, right=visitor(neg, op.right))
92+
if isinstance(op.left, Field):
93+
if str(op.left) in fields:
94+
return _inner_exp_binop(neg, op.left, op.op, op.right)
95+
return op
96+
elif isinstance(op.right, Field):
97+
try:
98+
op = op.flip()
99+
except NotImplementedError:
100+
logging.warning("can't flip big-col expression: %s", op)
101+
return Scalar(src="True", type="bool")
102+
return visitor(neg, op)
103+
else:
104+
return op
105+
raise ValueError(f"can't convert big-col expression: {type(op)}")
106+
107+
op = visitor(False, op)
108+
logging.info("big: inner_exp: %s", repr(op))
109+
return op
110+
111+
112+
def _inner_exp_binop_str(neg: bool, field: Field, op: str, right: str) -> Op:
113+
if len(right) > STR_MIN:
114+
a = right[:STR_MIN]
115+
b = right[: STR_MIN - 1] + chr(ord(right[STR_MIN - 1]) + 1)
116+
logging.info("big: str: %s .. %s", json.dumps(a), json.dumps(b))
117+
118+
if op == "==":
119+
if neg:
120+
return Scalar.from_py(False)
121+
return BinOp(
122+
left=BinOp(
123+
left=Scalar.from_py(a),
124+
op="<",
125+
right=field,
126+
),
127+
op="and",
128+
right=BinOp(
129+
left=field,
130+
op="<",
131+
right=Scalar.from_py(b),
132+
),
133+
)
134+
elif op == "!=":
135+
if neg:
136+
return Scalar.from_py(False)
137+
else:
138+
return Scalar.from_py(True)
139+
elif op == ">" or op == ">=":
140+
return BinOp(
141+
left=field,
142+
op=op,
143+
right=Scalar.from_py(a),
144+
)
145+
elif op == "<" or op == "<=":
146+
return BinOp(
147+
left=field,
148+
op=op,
149+
right=Scalar.from_py(b),
150+
)
151+
else:
152+
return BinOp(
153+
left=field,
154+
op=op,
155+
right=Scalar.from_py(right + "~"),
156+
)
157+
logging.error("can't convert big-col expression: %s %s %s", field, op, right)
158+
raise ValueError("can't convert big-col expression")
159+
160+
161+
def _inner_exp_binop(neg: bool, left: Field, op: str, right: Op) -> Op:
162+
if isinstance(right, Scalar):
163+
v = right.to_py()
164+
if isinstance(v, str):
165+
return _inner_exp_binop_str(neg, left, op, v)
166+
else:
167+
raise ValueError("can't convert big-col expression for scalar %s", right)
168+
raise ValueError("can't convert big-col expression: %s %s %s", left, op, right)

0 commit comments

Comments
 (0)