Skip to content

Commit 2714c98

Browse files
committed
Add support for Fivetran's TIME_NAIVE data type
1 parent 16ce705 commit 2714c98

File tree

6 files changed

+33
-3
lines changed

6 files changed

+33
-3
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- Added support for `AlterTable::drop_columns` operation.
1010
- Added support for schema migrations.
1111
- Added support for history mode.
12+
- Added support for Fivetran's `TIME_NAIVE` data type.
1213

1314
## v0.0.3 - 2025-05-23
1415
- Dependencies: Updated to `sqlalchemy-cratedb==0.42.0.dev2`

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ dependencies = [
8080
"grpcio-tools<1.77",
8181
"protobuf<6.34",
8282
"pycryptodome<3.24",
83+
"python-dateutil<2.10",
8384
"sqlalchemy-cratedb==0.42.0.dev2",
8485
"toolz<2",
8586
"zstandard<0.26",

src/cratedb_fivetran_destination/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
WriteHistoryBatchProcessor,
1515
)
1616
from cratedb_fivetran_destination.model import (
17+
CrateDBKnowledge,
1718
FieldMap,
1819
FivetranKnowledge,
1920
FivetranTable,
@@ -366,7 +367,10 @@ def _files_to_records(request, files: t.List[str]):
366367
for record in read_csv.decrypt_file(filename, value):
367368
# Rename keys according to field map.
368369
record = FieldMap.rename_keys(record)
370+
# Replace magic Fivetran values.
369371
FivetranKnowledge.replace_values(record)
372+
# Adjust values to data types for CrateDB.
373+
CrateDBKnowledge.replace_values(request, record)
370374
yield record
371375

372376
def _reflect_table(self, schema: str, table: str):

src/cratedb_fivetran_destination/model.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import typing as t
22
from textwrap import dedent
33

4+
import dateutil
45
import sqlalchemy as sa
56
from attr import Factory
67
from attrs import define
@@ -74,6 +75,7 @@ class TypeMap:
7475
# CrateDB can not store `DATE` types, so converge to `TIMESTAMP`.
7576
DataType.NAIVE_DATE: sa.TIMESTAMP(),
7677
DataType.NAIVE_DATETIME: sa.TIMESTAMP(),
78+
DataType.NAIVE_TIME: sa.TIMESTAMP(),
7779
DataType.UTC_DATETIME: sa.TIMESTAMP(),
7880
# TODO: The parameters are coming from the API, here `input_fivetran.json`.
7981
# How to loop them into this type resolution machinery?
@@ -82,7 +84,6 @@ class TypeMap:
8284
DataType.STRING: sa.String(),
8385
DataType.JSON: ObjectTypeImpl(),
8486
DataType.XML: sa.String(),
85-
DataType.NAIVE_TIME: sa.TIMESTAMP(),
8687
}
8788

8889
cratedb_map = {
@@ -109,6 +110,8 @@ class TypeMap:
109110
# sa.DateTime: DataType.NAIVE_DATETIME,
110111
sa.DateTime: DataType.UTC_DATETIME,
111112
sa.DATETIME: DataType.UTC_DATETIME,
113+
sa.Time: DataType.NAIVE_TIME,
114+
sa.TIME: DataType.NAIVE_TIME,
112115
sa.Numeric: DataType.DECIMAL,
113116
sa.DECIMAL: DataType.DECIMAL,
114117
sa.LargeBinary: DataType.BINARY,
@@ -156,6 +159,27 @@ def replace_values(cls, record):
156159
record.pop(rm)
157160

158161

162+
class CrateDBKnowledge:
163+
"""
164+
Manage special knowledge about CrateDB.
165+
166+
CrateDB can't store values of the `TIME` type, so we selected to store it as `DATETIME`
167+
This routine converges the value.
168+
"""
169+
170+
@classmethod
171+
def replace_values(cls, request, record):
172+
for column in request.table.columns:
173+
if column.type == common_pb2.DataType.NAIVE_TIME and column.name in record:
174+
obj = dateutil.parser.parse(record[column.name])
175+
obj = obj.replace(year=1970, month=1, day=1)
176+
# Calculate milliseconds since midnight (timezone-independent).
177+
ms = (
178+
obj.hour * 3600 + obj.minute * 60 + obj.second
179+
) * 1000 + obj.microsecond // 1000
180+
record[column.name] = str(ms)
181+
182+
159183
@define
160184
class TableInfo:
161185
"""

tests/data/cratedb_canonical/input_cratedb.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"string": "Hotzenplotz",
4747
"json": "{\"foo\": \"bar\", \"count\": 42}",
4848
"xml": "XML",
49-
"naive_time": "1970-01-02T00:00:00+01:00"
49+
"naive_time": "12:34:56"
5050
}
5151
]
5252
}

tests/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def starter():
106106
string="Hotzenplotz",
107107
json={"count": 42, "foo": "bar"},
108108
xml="XML",
109-
naive_time=86400000,
109+
naive_time=45296000,
110110
__fivetran_synced=mock.ANY,
111111
__fivetran_id="zyx-987-abc",
112112
)

0 commit comments

Comments
 (0)