Skip to content

Commit 9dfa63d

Browse files
committed
Add support for Migrate::*
This satisfies SDK tester's `schema_migrations_input_dml.json`. Part 2: Make it work.
1 parent f0cd1eb commit 9dfa63d

File tree

5 files changed

+189
-49
lines changed

5 files changed

+189
-49
lines changed

src/cratedb_fivetran_destination/main.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def __init__(self):
3636
self.metadata = sa.MetaData()
3737
self.engine: sa.Engine = None
3838
self.processor: Processor = None
39-
self.migration_helper = SchemaMigrationHelper(CrateDBDestinationImpl.table_map)
4039

4140
def ConfigurationForm(self, request, context):
4241
log_message(LOG_INFO, "Fetching configuration form")
@@ -246,9 +245,10 @@ def DescribeTable(self, request, context):
246245
"""
247246
self._configure_database(request.configuration.get("url"))
248247
table_name = self.table_name(request)
248+
schema_name = self.schema_name(request)
249249
table: common_pb2.Table = common_pb2.Table(name=table_name)
250250
try:
251-
sa_table = self._reflect_table(schema=request.schema_name, table=table_name)
251+
sa_table = self._reflect_table(schema=schema_name, table=table_name)
252252
except sa.exc.NoSuchTableError:
253253
msg = f"Table not found: {table_name}"
254254
log_message(LOG_WARNING, f"DescribeTable: {msg}")
@@ -270,7 +270,7 @@ def Migrate(self, request, context):
270270
"""
271271
Example implementation of the new Migrate RPC introduced for schema migration support.
272272
This method inspects which migration operation (oneof) was requested and logs / handles it.
273-
For demonstration, all recognized operations return success.
273+
For demonstration, all recognized operations return `success`.
274274
275275
:param request: The migration request contains details of the operation.
276276
:param context: gRPC context
@@ -279,34 +279,37 @@ def Migrate(self, request, context):
279279
rather different migration methods are just manipulating table_map to simulate
280280
the migration operations.
281281
"""
282+
self._configure_database(request.configuration.get("url"))
283+
migration_helper = SchemaMigrationHelper(self.engine, CrateDBDestinationImpl.table_map)
284+
282285
details = request.details
283286
schema = details.schema
284287
table = details.table
285288

286289
operation_case = details.WhichOneof("operation")
287290
log_message(LOG_INFO, f"[Migrate] schema={schema} table={table} operation={operation_case}")
288291

289-
response = None
292+
table_obj = self.DescribeTable(request, context).table
290293

291294
if operation_case == "drop":
292-
response = self.migration_helper.handle_drop(details.drop, schema, table)
295+
response = migration_helper.handle_drop(details.drop, schema, table)
293296

294297
elif operation_case == "copy":
295-
response = self.migration_helper.handle_copy(details.copy, schema, table)
298+
response = migration_helper.handle_copy(details.copy, schema, table, table_obj)
296299

297300
elif operation_case == "rename":
298-
response = self.migration_helper.handle_rename(details.rename, schema, table)
301+
response = migration_helper.handle_rename(details.rename, schema, table)
299302

300303
elif operation_case == "add":
301-
response = self.migration_helper.handle_add(details.add, schema, table)
304+
response = migration_helper.handle_add(details.add, schema, table, table_obj)
302305

303306
elif operation_case == "update_column_value":
304-
response = self.migration_helper.handle_update_column_value(
307+
response = migration_helper.handle_update_column_value(
305308
details.update_column_value, schema, table
306309
)
307310

308311
elif operation_case == "table_sync_mode_migration":
309-
response = self.migration_helper.handle_table_sync_mode_migration(
312+
response = migration_helper.handle_table_sync_mode_migration(
310313
details.table_sync_mode_migration, schema, table
311314
)
312315

@@ -368,11 +371,22 @@ def _table_info_from_fivetran(self, table: common_pb2.Table) -> TableInfo: # pr
368371
primary_keys = [column.name for column in table.columns if column.primary_key]
369372
return TableInfo(fullname=table.name, primary_keys=primary_keys)
370373

374+
@staticmethod
375+
def schema_name(request):
376+
"""
377+
Return schema name from request object.
378+
"""
379+
if hasattr(request, "details"):
380+
return request.details.schema
381+
return request.schema_name
382+
371383
@staticmethod
372384
def table_name(request):
373385
"""
374386
Return table name from request object.
375387
"""
388+
if hasattr(request, "details"):
389+
return request.details.table
376390
if hasattr(request, "table"):
377391
return request.table.name
378392
return request.table_name

src/cratedb_fivetran_destination/schema_migration_helper.py

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
# ruff: noqa: E501
1+
# ruff: noqa: E501,S608
22
# https://github.com/fivetran/fivetran_partner_sdk/blob/main/examples/destination_connector/python/schema_migration_helper.py
33
import sys
44

5+
import sqlalchemy as sa
6+
7+
from cratedb_fivetran_destination.model import SqlBag, TypeMap
8+
59
sys.path.append("sdk_pb2")
610

711
from fivetran_sdk import common_pb2, destination_sdk_pb2
@@ -18,16 +22,18 @@
1822
class SchemaMigrationHelper:
1923
"""Helper class for handling migration operations"""
2024

21-
def __init__(self, table_map):
25+
def __init__(self, engine: sa.Engine, table_map):
26+
self.engine = engine
2227
self.table_map = table_map
2328

2429
def handle_drop(self, drop_op, schema, table):
2530
"""Handles drop operations (drop table, drop column in history mode)."""
2631
entity_case = drop_op.WhichOneof("entity")
2732

2833
if entity_case == "drop_table":
29-
# table-map manipulation to simulate drop, replace with actual logic.
30-
self.table_map.pop(table, None)
34+
sql = f'DROP TABLE "{schema}"."{table}"'
35+
with self.engine.connect() as conn:
36+
conn.execute(sa.text(sql))
3137

3238
log_message(INFO, f"[Migrate:Drop] Dropping table {schema}.{table}")
3339
return destination_sdk_pb2.MigrateResponse(success=True)
@@ -53,15 +59,18 @@ def handle_drop(self, drop_op, schema, table):
5359
log_message(WARNING, "[Migrate:Drop] No drop entity specified")
5460
return destination_sdk_pb2.MigrateResponse(unsupported=True)
5561

56-
def handle_copy(self, copy_op, schema, table):
62+
def handle_copy(self, copy_op, schema, table, table_obj: common_pb2.Table):
5763
"""Handles copy operations (copy table, copy column, copy table to history mode)."""
5864
entity_case = copy_op.WhichOneof("entity")
5965

6066
if entity_case == "copy_table":
61-
# table-map manipulation to simulate copy, replace with actual logic.
6267
copy_table = copy_op.copy_table
63-
if copy_table.from_table in self.table_map:
64-
self.table_map[copy_table.to_table] = self.table_map[copy_table.from_table]
68+
sql = (
69+
f'CREATE TABLE "{schema}"."{copy_table.to_table}" '
70+
f'AS SELECT * FROM "{schema}"."{copy_table.from_table}"'
71+
)
72+
with self.engine.connect() as conn:
73+
conn.execute(sa.text(sql))
6574

6675
log_message(
6776
INFO,
@@ -70,17 +79,25 @@ def handle_copy(self, copy_op, schema, table):
7079
return destination_sdk_pb2.MigrateResponse(success=True)
7180

7281
if entity_case == "copy_column":
73-
# table-map manipulation to simulate copy column, replace with actual logic.
82+
sql_bag = SqlBag()
7483
copy_column = copy_op.copy_column
75-
table_obj = self.table_map.get(table)
76-
if table_obj:
77-
for col in table_obj.columns:
78-
if col.name == copy_column.from_column:
79-
new_col = type(col)()
80-
new_col.CopyFrom(col)
81-
new_col.name = copy_column.to_column
82-
table_obj.columns.add().CopyFrom(new_col)
83-
break
84+
for col in table_obj.columns:
85+
if col.name == copy_column.from_column:
86+
new_col = type(col)()
87+
new_col.CopyFrom(col)
88+
new_col.name = copy_column.to_column
89+
table_obj.columns.add().CopyFrom(new_col)
90+
type_ = TypeMap.to_cratedb(new_col.type, new_col.params)
91+
sql_bag.add(
92+
f'ALTER TABLE "{schema}"."{table}" ADD COLUMN "{new_col.name}" {type_};'
93+
)
94+
sql_bag.add(f'UPDATE "{schema}"."{table}" SET "{new_col.name}"="{col.name}";')
95+
break
96+
97+
if sql_bag:
98+
with self.engine.connect() as conn:
99+
for command in sql_bag.statements:
100+
conn.execute(sa.text(command))
84101

85102
log_message(
86103
INFO,
@@ -116,31 +133,21 @@ def handle_rename(self, rename_op, schema, table):
116133
entity_case = rename_op.WhichOneof("entity")
117134

118135
if entity_case == "rename_table":
119-
# table-map manipulation to simulate rename, replace with actual logic.
120136
rt = rename_op.rename_table
121-
if rt.from_table in self.table_map:
122-
tbl = self.table_map.pop(rt.from_table)
123-
# Adjust name inside the Table metadata if needed
124-
tbl_copy = tbl.__class__.FromString(tbl.SerializeToString())
125-
if hasattr(tbl_copy, "name"):
126-
tbl_copy.name = rt.to_table
127-
self.table_map[rt.to_table] = tbl_copy
137+
sql = f'ALTER TABLE "{schema}"."{rt.from_table}" RENAME TO "{rt.to_table}";'
138+
with self.engine.connect() as conn:
139+
conn.execute(sa.text(sql))
128140

129141
log_message(
130142
INFO, f"[Migrate:RenameTable] from={rt.from_table} to={rt.to_table} schema={schema}"
131143
)
132144
return destination_sdk_pb2.MigrateResponse(success=True)
133145

134146
if entity_case == "rename_column":
135-
# table-map manipulation to simulate rename column, replace with actual logic.
136147
rename_column = rename_op.rename_column
137-
table_obj = self.table_map.get(table)
138-
if table_obj:
139-
# Rename the column
140-
for col in table_obj.columns:
141-
if col.name == rename_column.from_column:
142-
col.name = rename_column.to_column
143-
break
148+
sql = f'ALTER TABLE "{schema}"."{table}" RENAME "{rename_column.from_column}" TO "{rename_column.to_column}";'
149+
with self.engine.connect() as conn:
150+
conn.execute(sa.text(sql))
144151

145152
log_message(
146153
INFO,
@@ -151,7 +158,7 @@ def handle_rename(self, rename_op, schema, table):
151158
log_message(WARNING, "[Migrate:Rename] No rename entity specified")
152159
return destination_sdk_pb2.MigrateResponse(unsupported=True)
153160

154-
def handle_add(self, add_op, schema, table):
161+
def handle_add(self, add_op, schema, table, table_obj: common_pb2.Table):
155162
"""Handles add operations (add column in history mode, add column with default value)."""
156163
entity_case = add_op.WhichOneof("entity")
157164

@@ -171,13 +178,15 @@ def handle_add(self, add_op, schema, table):
171178
return destination_sdk_pb2.MigrateResponse(success=True)
172179

173180
if entity_case == "add_column_with_default_value":
174-
# table-map manipulation to simulate add column with default value, replace with actual logic.
175181
add_col_default_with_value = add_op.add_column_with_default_value
176-
table_obj = self.table_map.get(table)
177182
if table_obj:
178183
new_col = table_obj.columns.add()
179184
new_col.name = add_col_default_with_value.column
180185
new_col.type = add_col_default_with_value.column_type
186+
type_ = TypeMap.to_cratedb(new_col.type, new_col.params)
187+
sql = f'ALTER TABLE "{schema}"."{table}" ADD COLUMN "{new_col.name}" {type_};'
188+
with self.engine.connect() as conn:
189+
conn.execute(sa.text(sql))
181190

182191
log_message(
183192
INFO,
@@ -190,7 +199,12 @@ def handle_add(self, add_op, schema, table):
190199

191200
def handle_update_column_value(self, upd, schema, table):
192201
"""Handles update column value operation."""
193-
# Placeholder: Update all existing rows' column value.
202+
with self.engine.connect() as conn:
203+
conn.execute(
204+
sa.text(f'UPDATE "{schema}"."{table}" SET "{upd.column}"=:value;'), # noqa: S608
205+
parameters={"value": upd.value},
206+
)
207+
conn.execute(sa.text(f'REFRESH TABLE "{schema}"."{table}";'))
194208

195209
log_message(
196210
INFO,

tests/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ def engine():
88
with engine.connect() as conn:
99
# Clean up stale "read-only" mode states.
1010
try:
11-
conn.execute(sa.text('ALTER TABLE testdrive.foo RESET ("blocks.write");'))
11+
conn.execute(sa.text('ALTER TABLE testdrive.foo SET ("blocks.write"=false);'))
12+
conn.execute(sa.text('ALTER TABLE tester.transaction SET ("blocks.write"=false);'))
1213
except Exception: # noqa: S110
1314
pass
1415
conn.execute(sa.text("DROP TABLE IF EXISTS testdrive.foo"))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"writerType": "Database",
3+
"url": "crate://",
4+
"database": "doc",
5+
"table": "testdrive",
6+
"host": "localhost",
7+
"port": "4200",
8+
"user": "crate",
9+
"password": "",
10+
"enableEncryption": "false"
11+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
{
2+
"create_table" : {
3+
"transaction": {
4+
"columns": {
5+
"id": "INT",
6+
"amount" : "DOUBLE",
7+
"desc": "STRING"
8+
},
9+
"primary_key": ["id"]
10+
},
11+
"transaction_drop": {
12+
"columns": {
13+
"id": "INT",
14+
"amount" : "DOUBLE",
15+
"desc": "STRING"
16+
},
17+
"primary_key": ["id"]
18+
}
19+
},
20+
"ops" : [
21+
{
22+
"upsert": {
23+
"transaction": [
24+
{"id":1, "amount": 100.45, "desc":null},
25+
{"id":2, "amount": 150.33, "desc": "two"},
26+
{"id":3, "amount": 150.33, "desc": "two"},
27+
{"id":4, "amount": 150.33, "desc": "two"},
28+
{"id":10, "amount": 200, "desc": "three"},
29+
{"id":20, "amount": 50, "desc": "money"}
30+
],
31+
"transaction_drop": [
32+
{"id":1, "amount": 100.45, "desc":null},
33+
{"id":2, "amount": 150.33, "desc": "two"}
34+
]
35+
}
36+
}
37+
],
38+
"schema_migration" : [
39+
{
40+
"copy_column": [
41+
{
42+
"table": "transaction",
43+
"from_column": "desc",
44+
"to_column": "desc_detailed"
45+
}
46+
],
47+
"update_column_value": [
48+
{
49+
"table": "transaction",
50+
"column": "amount",
51+
"value": "202.57"
52+
}
53+
],
54+
"add_column_with_default_value": [
55+
{
56+
"table": "transaction",
57+
"column": "operation_time",
58+
"data_type": "UTC_DATETIME",
59+
"default_value": "2005-05-23T20:57:00Z"
60+
}
61+
],
62+
"set_column_to_null": [
63+
{
64+
"table": "transaction",
65+
"column": "desc"
66+
}
67+
],
68+
"copy_table": [
69+
{
70+
"from_table": "transaction",
71+
"to_table": "transaction_new"
72+
}
73+
],
74+
"rename_column": [
75+
{
76+
"table": "transaction",
77+
"from_column": "amount",
78+
"to_column": "amount_renamed"
79+
}
80+
],
81+
"rename_table": [
82+
{
83+
"from_table": "transaction_new",
84+
"to_table": "transaction_renamed"
85+
}
86+
],
87+
"drop_table": [
88+
{
89+
"table": "transaction_drop"
90+
}
91+
]
92+
}
93+
],
94+
"describe_table" : [
95+
"transaction",
96+
"transaction_drop",
97+
"transaction_new",
98+
"transaction_renamed"
99+
]
100+
}

0 commit comments

Comments
 (0)