Skip to content

Commit 3c98c9e

Browse files
authored
feat: pgroll support import (#1470)
1 parent 3c248a3 commit 3c98c9e

File tree

4 files changed

+4908
-2909
lines changed

4 files changed

+4908
-2909
lines changed

cli/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"ini": "^4.1.2",
4343
"lodash.compact": "^3.0.1",
4444
"lodash.get": "^4.4.2",
45+
"lodash.keyby": "^4.6.0",
4546
"lodash.set": "^4.3.2",
4647
"node-fetch": "^3.3.2",
4748
"open": "^10.1.0",
@@ -51,6 +52,7 @@
5152
"text-table": "^0.2.0",
5253
"tmp": "^0.2.3",
5354
"tslib": "^2.6.2",
55+
"type-fest": "^4.18.1",
5456
"which": "^4.0.0",
5557
"zod": "^3.23.6"
5658
},
@@ -59,6 +61,7 @@
5961
"@types/babel__core": "^7.20.5",
6062
"@types/lodash.compact": "^3.0.9",
6163
"@types/lodash.get": "^4.4.9",
64+
"@types/lodash.keyby": "^4.6.9",
6265
"@types/lodash.set": "^4.3.9",
6366
"@types/relaxed-json": "^1.0.4",
6467
"@types/text-table": "^0.2.5",

cli/src/commands/import/csv.ts

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ import { importColumnTypes } from '@xata.io/importer';
55
import { open, writeFile } from 'fs/promises';
66
import { BaseCommand } from '../../base.js';
77
import { enumFlag } from '../../utils/oclif.js';
8-
import { getBranchDetailsWithPgRoll } from '../../migrations/pgroll.js';
8+
import {
9+
getBranchDetailsWithPgRoll,
10+
waitForMigrationToFinish,
11+
xataColumnTypeToPgRollComment
12+
} from '../../migrations/pgroll.js';
13+
import { compareSchemas } from '../../utils/compareSchema.js';
14+
import keyBy from 'lodash.keyby';
915

1016
const ERROR_CONSOLE_LOG_LIMIT = 200;
1117
const ERROR_LOG_FILE = 'errors.log';
@@ -23,6 +29,8 @@ const bufferEncodings: BufferEncoding[] = [
2329
'hex'
2430
];
2531

32+
const INTERNAL_COLUMNS_PGROLL = ['xata_id', 'xata_createdat', 'xata_updatedat', 'xata_version'];
33+
2634
export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
2735
static description = 'Import a CSV file';
2836

@@ -144,12 +152,26 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
144152
if (!parseResults.success) {
145153
throw new Error('Failed to parse CSV file');
146154
}
147-
const batchRows = parseResults.data.map(({ data }) => data);
155+
const batchRows = parseResults.data.map(({ data }) => {
156+
const formattedRow: { [k: string]: any } = {};
157+
const keys = Object.keys(data);
158+
for (const key of keys) {
159+
if (INTERNAL_COLUMNS_PGROLL.includes(key) && key !== 'xata_id') continue;
160+
formattedRow[key] = data[key];
161+
}
162+
return formattedRow;
163+
});
164+
148165
const importResult = await xata.import.importBatch(
149166
{ workspace, region, database, branch },
150-
{ columns: parseResults.columns, table, batchRows }
167+
{
168+
columns: parseResults.columns.filter(
169+
({ name }) => name === 'xata_id' || !INTERNAL_COLUMNS_PGROLL.includes(name)
170+
),
171+
table,
172+
batchRows
173+
}
151174
);
152-
153175
await xata.import.importFiles(
154176
{ database, branch, region, workspace: workspace },
155177
{
@@ -212,22 +234,39 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
212234
const xata = await this.getXataClient();
213235
const { workspace, region, database, branch } = await this.parseDatabase();
214236
const { schema: existingSchema } = await getBranchDetailsWithPgRoll(xata, { workspace, region, database, branch });
215-
const newSchema = {
216-
tables: [
217-
...existingSchema.tables.filter((t) => t.name !== table),
218-
{ name: table, columns: columns.filter((c) => c.name !== 'id') }
219-
]
220-
};
221237

222-
const { edits } = await xata.api.migrations.compareBranchWithUserSchema({
223-
pathParams: { workspace, region, dbBranchName: `${database}:main` },
224-
body: { schema: newSchema }
225-
});
226-
if (edits.operations.length > 0) {
227-
const destructiveOperations = edits.operations
238+
const { edits } = compareSchemas(
239+
{},
240+
{
241+
tables: {
242+
[table]: {
243+
name: table,
244+
xataCompatible: false,
245+
columns: keyBy(
246+
columns
247+
.filter((c) => !INTERNAL_COLUMNS_PGROLL.includes(c.name as any))
248+
.map((c) => {
249+
return {
250+
name: c.name,
251+
type: c.type,
252+
nullable: c.notNull !== false,
253+
default: c.defaultValue ?? null,
254+
unique: c.unique,
255+
comment: xataColumnTypeToPgRollComment(c)
256+
};
257+
}),
258+
'name'
259+
)
260+
}
261+
}
262+
}
263+
);
264+
265+
if (edits.length > 0) {
266+
const destructiveOperations = edits
228267
.map((op) => {
229-
if (!('removeColumn' in op)) return undefined;
230-
return op.removeColumn.column;
268+
if (!('drop_column' in op)) return undefined;
269+
return op.drop_column.column;
231270
})
232271
.filter((x) => x !== undefined);
233272

@@ -262,10 +301,14 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
262301
process.exit(1);
263302
}
264303

265-
await xata.api.migrations.applyBranchSchemaEdit({
304+
const { jobID } = await xata.api.migrations.applyMigration({
266305
pathParams: { workspace, region, dbBranchName: `${database}:${branch}` },
267-
body: { edits }
306+
body: {
307+
adaptTables: true,
308+
operations: edits
309+
}
268310
});
311+
await waitForMigrationToFinish(xata.api, workspace, region, database, branch, jobID);
269312
}
270313
}
271314
}

cli/src/utils/compareSchema.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { PgRollOperation } from '@xata.io/pgroll';
2+
import { PartialDeep } from 'type-fest';
3+
import { Schemas } from '@xata.io/client';
4+
import { generateLinkReference, tableNameFromLinkComment, xataColumnTypeToPgRoll } from '../migrations/pgroll.js';
5+
6+
export function compareSchemas(
7+
source: PartialDeep<Schemas.BranchSchema>,
8+
target: PartialDeep<Schemas.BranchSchema>
9+
): { edits: PgRollOperation[] } {
10+
const edits: PgRollOperation[] = [];
11+
12+
// Compare tables
13+
const sourceTables = Object.keys(source.tables ?? {});
14+
const targetTables = Object.keys(target.tables ?? {});
15+
const newTables = targetTables.filter((table) => !sourceTables.includes(table));
16+
const deletedTables = sourceTables.filter((table) => !targetTables.includes(table));
17+
18+
// Compare columns
19+
for (const table of sourceTables) {
20+
const sourceColumns = Object.keys(source.tables?.[table]?.columns ?? {});
21+
const targetColumns = Object.keys(target.tables?.[table]?.columns ?? {});
22+
const newColumns = targetColumns.filter((column) => !sourceColumns.includes(column));
23+
const deletedColumns = sourceColumns.filter((column) => !targetColumns.includes(column));
24+
25+
// Add columns
26+
for (const column of newColumns) {
27+
const props = target.tables?.[table]?.columns?.[column] ?? {};
28+
edits.push({
29+
add_column: {
30+
table,
31+
column: {
32+
name: column,
33+
type: xataColumnTypeToPgRoll(props?.type as any),
34+
comment: props?.comment,
35+
nullable: !(props?.nullable === false),
36+
unique: props?.unique,
37+
default: props?.default ?? undefined,
38+
references:
39+
props?.type === 'link' && props?.name
40+
? generateLinkReference({
41+
column: props.name,
42+
table: tableNameFromLinkComment(props?.comment ?? '') ?? ''
43+
})
44+
: undefined
45+
}
46+
}
47+
});
48+
}
49+
50+
// Delete columns
51+
for (const column of deletedColumns) {
52+
edits.push({ drop_column: { table, column } });
53+
}
54+
55+
// Compare column properties
56+
for (const column of targetColumns) {
57+
const sourceProps = source.tables?.[table]?.columns?.[column] ?? {};
58+
const targetProps = target.tables?.[table]?.columns?.[column] ?? {};
59+
60+
if (sourceProps.type !== targetProps.type) {
61+
edits.push({
62+
alter_column: {
63+
table,
64+
column,
65+
type: targetProps.type,
66+
references:
67+
targetProps?.type === 'link' && targetProps?.name
68+
? generateLinkReference({
69+
column: targetProps.name,
70+
table: tableNameFromLinkComment(targetProps?.comment ?? '') ?? ''
71+
})
72+
: undefined
73+
}
74+
});
75+
}
76+
77+
if (sourceProps.nullable !== targetProps.nullable) {
78+
edits.push({ alter_column: { table, column, nullable: targetProps.nullable } });
79+
}
80+
81+
if (sourceProps.unique !== targetProps.unique) {
82+
edits.push({
83+
alter_column: {
84+
table,
85+
column,
86+
unique: {
87+
name: `${table}_${column}_unique`
88+
}
89+
}
90+
});
91+
}
92+
}
93+
}
94+
95+
// Delete tables
96+
for (const table of deletedTables) {
97+
edits.push({ drop_table: { name: table } });
98+
}
99+
100+
// Add new tables
101+
for (const table of newTables) {
102+
const props = target.tables?.[table] ?? {};
103+
edits.push({
104+
create_table: {
105+
name: table,
106+
comment: props.comment,
107+
columns: Object.entries(props.columns ?? {}).map(([name, column]) => {
108+
return {
109+
name,
110+
type: xataColumnTypeToPgRoll(column?.type as any),
111+
comment: column?.comment,
112+
nullable: !(column?.nullable === false),
113+
unique: column?.unique,
114+
default: column?.default ?? undefined,
115+
references:
116+
column?.type === 'link' && column?.name
117+
? generateLinkReference({
118+
column: column?.name,
119+
table: tableNameFromLinkComment(column?.comment ?? '') ?? ''
120+
})
121+
: undefined
122+
};
123+
})
124+
}
125+
});
126+
}
127+
128+
return { edits };
129+
}

0 commit comments

Comments
 (0)