diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index a57ac4f65..b05089d0a 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -164,7 +164,22 @@ private void initializeTableIfRequired(InternalTable internalTable) { @Override public void syncSchema(InternalSchema schema) { Schema latestSchema = schemaExtractor.toIceberg(schema); - schemaSync.sync(transaction.table().schema(), latestSchema, transaction); + if (!transaction.table().schema().sameSchema(latestSchema)) { + boolean hasFieldIds = + schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null); + if (hasFieldIds) { + // There is no clean way to sync the schema with the provided field IDs using the + // transaction API so we commit the current transaction and interact directly with + // the operations API. + transaction.commitTransaction(); + schemaSync.syncWithProvidedIds(latestSchema, table); + // Start a new transaction for remaining operations + table.refresh(); + transaction = table.newTransaction(); + } else { + schemaSync.sync(transaction.table().schema(), latestSchema, transaction); + } + } } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java index 800938cb4..4b5705601 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java @@ -28,7 +28,10 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.types.Types; @@ -58,6 +61,14 @@ public void sync(Schema current, Schema latest, Transaction transaction) { } } + public void syncWithProvidedIds(Schema latest, Table table) { + BaseTable baseTable = ((BaseTable) table); + TableMetadata current = baseTable.operations().current(); + TableMetadata updated = + TableMetadata.buildFrom(current).setCurrentSchema(latest, latest.highestFieldId()).build(); + baseTable.operations().commit(current, updated); + } + /** * Return a mapping of fieldId in the latest schema to an update action to perform. This allows * updates to happen in the same order as the source system. diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java index 431184ce5..06b625c03 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java @@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -75,24 +77,34 @@ Table getOrCreateTable( return getTable(catalogConfig, tableIdentifier, basePath); } else { try { - return getCatalog(catalogConfig) - .map( - catalog -> - catalog.createTable( - tableIdentifier, - schema, - partitionSpec, - basePath, - getDefaultMappingProperties(schema))) - .orElseGet( - () -> - getHadoopTables() - .create( - schema, - partitionSpec, - SortOrder.unsorted(), - getDefaultMappingProperties(schema), - basePath)); + // initialize the table with an empty schema, then manually set the schema to prevent the + // Iceberg API from remapping the field IDs. + Table tableWithEmptySchema = + getCatalog(catalogConfig) + .map( + catalog -> + catalog.createTable( + tableIdentifier, + new Schema(), + PartitionSpec.unpartitioned(), + basePath, + getDefaultMappingProperties(schema))) + .orElseGet( + () -> + getHadoopTables() + .create( + new Schema(), + PartitionSpec.unpartitioned(), + getDefaultMappingProperties(schema), + basePath)); + // set the schema with the provided field IDs + TableOperations operations = ((BaseTable) tableWithEmptySchema).operations(); + TableMetadata tableMetadata = operations.current(); + TableMetadata.Builder builder = TableMetadata.buildFrom(tableMetadata); + builder.setCurrentSchema(schema, schema.highestFieldId()); + builder.setDefaultPartitionSpec(partitionSpec); + operations.commit(tableMetadata, builder.build()); + return getTable(catalogConfig, tableIdentifier, basePath); } catch (AlreadyExistsException ex) { log.info("Table {} not created since it already exists", tableIdentifier); return getTable(catalogConfig, tableIdentifier, basePath); diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java index 89460c409..8295ce516 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java @@ -590,7 +590,10 @@ protected static Schema addTopLevelField(Schema schema) { @SneakyThrows protected HoodieTableMetaClient getMetaClient( - TypedProperties keyGenProperties, HoodieTableType hoodieTableType, Configuration conf) { + TypedProperties keyGenProperties, + HoodieTableType hoodieTableType, + Configuration conf, + boolean populateMetaFields) { LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf); // Enforce checksum such that fs.open() is consistent to DFS fs.setVerifyChecksum(true); @@ -614,6 +617,7 @@ protected HoodieTableMetaClient getMetaClient( .setPayloadClass(OverwriteWithLatestAvroPayload.class) .setCommitTimezone(HoodieTimelineTimeZone.UTC) .setBaseFileFormat(HoodieFileFormat.PARQUET.toString()) + .setPopulateMetaFields(populateMetaFields) .build(); return HoodieTableMetaClient.initTableAndGetMetaClient(conf, this.basePath, properties); } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java index abbe7fe67..2f5b73e42 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java @@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable { private HoodieJavaWriteClient writeClient; private final Configuration conf; + private final boolean addFieldIds; /** * Create a test table instance for general testing. The table is created with the schema defined @@ -83,7 +84,13 @@ public class TestJavaHudiTable extends TestAbstractHudiTable { public static TestJavaHudiTable forStandardSchema( String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) { return new TestJavaHudiTable( - tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null); + tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, false); + } + + public static TestJavaHudiTable forStandardSchemaWithFieldIds( + String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) { + return new TestJavaHudiTable( + tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, true); } public static TestJavaHudiTable forStandardSchema( @@ -93,7 +100,7 @@ public static TestJavaHudiTable forStandardSchema( HoodieTableType tableType, HoodieArchivalConfig archivalConfig) { return new TestJavaHudiTable( - tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig); + tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig, false); } /** @@ -119,7 +126,20 @@ public static TestJavaHudiTable withAdditionalColumns( tempDir, partitionConfig, tableType, - null); + null, + false); + } + + public static TestJavaHudiTable withAdditionalColumnsAndFieldIds( + String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) { + return new TestJavaHudiTable( + tableName, + addSchemaEvolutionFieldsToBase(BASIC_SCHEMA), + tempDir, + partitionConfig, + tableType, + null, + true); } public static TestJavaHudiTable withAdditionalTopLevelField( @@ -129,7 +149,13 @@ public static TestJavaHudiTable withAdditionalTopLevelField( HoodieTableType tableType, Schema previousSchema) { return new TestJavaHudiTable( - tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null); + tableName, + addTopLevelField(previousSchema), + tempDir, + partitionConfig, + tableType, + null, + false); } public static TestJavaHudiTable withSchema( @@ -138,7 +164,8 @@ public static TestJavaHudiTable withSchema( String partitionConfig, HoodieTableType tableType, Schema schema) { - return new TestJavaHudiTable(tableName, schema, tempDir, partitionConfig, tableType, null); + return new TestJavaHudiTable( + tableName, schema, tempDir, partitionConfig, tableType, null, false); } private TestJavaHudiTable( @@ -147,10 +174,12 @@ private TestJavaHudiTable( Path tempDir, String partitionConfig, HoodieTableType hoodieTableType, - HoodieArchivalConfig archivalConfig) { + HoodieArchivalConfig archivalConfig, + boolean addFieldIds) { super(name, schema, tempDir, partitionConfig); this.conf = new Configuration(); this.conf.set("parquet.avro.write-old-list-structure", "false"); + this.addFieldIds = addFieldIds; try { this.metaClient = initMetaClient(hoodieTableType, typedProperties); } catch (IOException ex) { @@ -297,13 +326,14 @@ private List> copyRecords( private HoodieTableMetaClient initMetaClient( HoodieTableType hoodieTableType, TypedProperties keyGenProperties) throws IOException { - return getMetaClient(keyGenProperties, hoodieTableType, conf); + return getMetaClient(keyGenProperties, hoodieTableType, conf, !addFieldIds); } private HoodieJavaWriteClient initJavaWriteClient( Schema schema, TypedProperties keyGenProperties, HoodieArchivalConfig archivalConfig) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPopulateMetaFields(!addFieldIds) .withProperties(generateWriteConfig(schema, keyGenProperties).getProps()) .withClusteringConfig( HoodieClusteringConfig.newBuilder() @@ -321,6 +351,18 @@ private HoodieJavaWriteClient initJavaWriteClient( .withArchivalConfig(archivalConfig) .build(); } + if (addFieldIds) { + writeConfig + .getProps() + .put( + "hoodie.avro.write.support.class", + "org.apache.xtable.hudi.extensions.HoodieAvroWriteSupportWithFieldIds"); + writeConfig + .getProps() + .put( + "hoodie.client.init.callback.classes", + "org.apache.xtable.hudi.extensions.AddFieldIdsClientInitCallback"); + } HoodieEngineContext context = new HoodieJavaEngineContext(conf); return new HoodieJavaWriteClient<>(context, writeConfig); } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java index 79316f5d9..1aaf61f96 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java @@ -271,6 +271,6 @@ private SparkRDDWriteClient initSparkWriteClient( private HoodieTableMetaClient initMetaClient( JavaSparkContext jsc, HoodieTableType hoodieTableType, TypedProperties keyGenProperties) { - return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration()); + return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration(), true); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index ab13ae2d6..ffe6a2177 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -364,7 +364,7 @@ private DataFile generateTestDataFile(int partition, Table table, String filePat .schema(csSchema) .createWriterFunc(GenericParquetWriter::buildWriter) .overwrite() - .withSpec(csPartitionSpec) + .withSpec(table.spec()) .withPartition(partitionInfo) .build(); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java index 98254591e..b07fac4c5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java @@ -18,6 +18,7 @@ package org.apache.xtable.iceberg; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,9 +30,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.InOrder; +import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.types.Type; @@ -335,6 +339,25 @@ void testAddMapFieldComment() { verify(mockUpdateSchema).commit(); } + @Test + void testSyncWithProvidedIds() { + BaseTable mockBaseTable = Mockito.mock(BaseTable.class, RETURNS_DEEP_STUBS); + TableMetadata mockCurrent = Mockito.mock(TableMetadata.class); + when(mockBaseTable.operations().current()).thenReturn(mockCurrent); + try (MockedStatic tableMetadataMockedStatic = + Mockito.mockStatic(TableMetadata.class)) { + TableMetadata.Builder mockBuilder = Mockito.mock(TableMetadata.Builder.class); + tableMetadataMockedStatic + .when(() -> TableMetadata.buildFrom(mockCurrent)) + .thenReturn(mockBuilder); + when(mockBuilder.setCurrentSchema(SCHEMA, SCHEMA.highestFieldId())).thenReturn(mockBuilder); + TableMetadata mockUpdated = Mockito.mock(TableMetadata.class); + when(mockBuilder.build()).thenReturn(mockUpdated); + schemaSync.syncWithProvidedIds(SCHEMA, mockBaseTable); + verify(mockBaseTable.operations()).commit(mockCurrent, mockUpdated); + } + } + private Schema addColumnToDefault(Schema schema, Types.NestedField field, Integer parentId) { List fields = new ArrayList<>(); for (Types.NestedField existingField : schema.columns()) { diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index c02d7f268..d5a25b022 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -170,9 +170,9 @@ public class TestIcebergSync { .build(); private final Schema icebergSchema = new Schema( - Types.NestedField.required(1, "timestamp_field", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "timestamp_field", Types.TimestampType.withoutZone()), Types.NestedField.required(2, "date_field", Types.DateType.get()), - Types.NestedField.required(3, "group_id", Types.IntegerType.get()), + Types.NestedField.required(1, "group_id", Types.IntegerType.get()), Types.NestedField.required( 4, "record", @@ -244,11 +244,13 @@ public void testCreateSnapshotControlFlow() throws Exception { TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); - validateIcebergTable(tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null); + validateIcebergTable( + tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null, icebergSchema); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); - validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null); + validateIcebergTable( + tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null, icebergSchema); ArgumentCaptor transactionArgumentCaptor = ArgumentCaptor.forClass(Transaction.class); @@ -256,7 +258,7 @@ public void testCreateSnapshotControlFlow() throws Exception { ArgumentCaptor partitionSpecArgumentCaptor = ArgumentCaptor.forClass(PartitionSpec.class); - verify(mockSchemaSync, times(2)) + verify(mockSchemaSync, times(1)) .sync( schemaArgumentCaptor.capture(), schemaArgumentCaptor.capture(), @@ -274,13 +276,9 @@ public void testCreateSnapshotControlFlow() throws Exception { assertTrue( partitionSpecSchemaArgumentCaptor.getAllValues().stream() .allMatch(capturedSchema -> capturedSchema.sameSchema(icebergSchema))); - // schema sync args for first iteration - assertTrue( - schemaArgumentCaptor.getAllValues().subList(0, 2).stream() - .allMatch(capturedSchema -> capturedSchema.sameSchema(icebergSchema))); // second snapshot sync will evolve the schema - assertTrue(schemaArgumentCaptor.getAllValues().get(2).sameSchema(icebergSchema)); - assertTrue(schemaArgumentCaptor.getAllValues().get(3).sameSchema(icebergSchema2)); + assertTrue(schemaArgumentCaptor.getAllValues().get(0).sameSchema(icebergSchema)); + assertTrue(schemaArgumentCaptor.getAllValues().get(1).sameSchema(icebergSchema2)); // check that the correct partition spec is used in calls to the mocks assertTrue( partitionSpecArgumentCaptor.getAllValues().stream() @@ -292,9 +290,6 @@ public void testCreateSnapshotControlFlow() throws Exception { assertSame( transactionArgumentCaptor.getAllValues().get(0), transactionArgumentCaptor.getAllValues().get(2)); - assertSame( - transactionArgumentCaptor.getAllValues().get(1), - transactionArgumentCaptor.getAllValues().get(3)); // validate that transactions are different between runs assertNotSame( transactionArgumentCaptor.getAllValues().get(1), @@ -358,7 +353,8 @@ public void testIncompleteWriteRollback() throws Exception { // get a new iceberg sync to make sure table is re-read from disk and no metadata is cached TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot3); - validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile3, dataFile4), null); + validateIcebergTable( + tableName, table2, Sets.newHashSet(dataFile3, dataFile4), null, icebergSchema); // Validate Iceberg table state Table table = getTable(basePath); assertEquals(4, table.history().size()); @@ -425,7 +421,8 @@ public void testTimestampPartitioning() throws Exception { Expressions.and( Expressions.greaterThanOrEqual( partitionField.getSourceField().getName(), "2022-10-01T00:00"), - Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02T00:00"))); + Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02T00:00")), + icebergSchema); } @Test @@ -485,7 +482,8 @@ public void testDatePartitioning() throws Exception { Sets.newHashSet(dataFile1, dataFile2), Expressions.and( Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(), "2022-10-01"), - Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02"))); + Expressions.lessThan(partitionField.getSourceField().getName(), "2022-10-02")), + icebergSchema); } @Test @@ -539,7 +537,8 @@ public void testNumericFieldPartitioning() throws Exception { Sets.newHashSet(dataFile1, dataFile2), Expressions.and( Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(), 1), - Expressions.lessThan(partitionField.getSourceField().getName(), 2))); + Expressions.lessThan(partitionField.getSourceField().getName(), 2)), + icebergSchema); } @Test @@ -619,7 +618,8 @@ public void testMultipleFieldPartitioning() throws Exception { Expressions.greaterThanOrEqual( partitionField2.getSourceField().getName(), "2022-10-01T00:00"), Expressions.lessThan( - partitionField2.getSourceField().getName(), "2022-10-02T00:00")))); + partitionField2.getSourceField().getName(), "2022-10-02T00:00"))), + icebergSchema); } @Test @@ -678,7 +678,8 @@ public void testNestedFieldPartitioning() throws Exception { tableName, table, Sets.newHashSet(dataFile1, dataFile2), - Expressions.equal(partitionField.getSourceField().getPath(), "value1")); + Expressions.equal(partitionField.getSourceField().getPath(), "value1"), + icebergSchema); } @Test @@ -822,13 +823,16 @@ private void validateIcebergTable( String tableName, InternalTable table, Set expectedFiles, - Expression filterExpression) + Expression filterExpression, + Schema expectedSchema) throws IOException { Path warehouseLocation = Paths.get(table.getBasePath()).getParent(); try (HadoopCatalog catalog = new HadoopCatalog(CONFIGURATION, warehouseLocation.toString())) { TableIdentifier tableId = TableIdentifier.of(Namespace.empty(), tableName); assertTrue(catalog.tableExists(tableId)); - TableScan scan = catalog.loadTable(tableId).newScan(); + Table icebergTable = catalog.loadTable(tableId); + assertTrue(expectedSchema.sameSchema(icebergTable.schema())); + TableScan scan = icebergTable.newScan(); if (filterExpression != null) { scan = scan.filter(filterExpression); } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java index f81f1336a..f424e3a96 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java @@ -20,7 +20,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,10 +32,14 @@ import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -101,27 +107,46 @@ void catalogGetOrCreateWithNewTable() { .catalogName(catalogName) .catalogOptions(OPTIONS) .build(); - Table mockTable = mock(Table.class); + BaseTable mockInitialTable = mock(BaseTable.class); + Table loadedTable = mock(Table.class); when(mockCatalog.tableExists(IDENTIFIER)).thenReturn(false); Schema schema = new Schema(); PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); when(mockCatalog.createTable( - IDENTIFIER, - schema, - partitionSpec, - BASE_PATH, - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema))))) - .thenReturn(mockTable); + eq(IDENTIFIER), + any(), + eq(PartitionSpec.unpartitioned()), + eq(BASE_PATH), + eq( + Collections.singletonMap( + TableProperties.DEFAULT_NAME_MAPPING, + NameMappingParser.toJson(MappingUtil.create(schema)))))) + .thenReturn(mockInitialTable); + when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable); - IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION); + TableOperations tableOperations = mock(TableOperations.class); + when(mockInitialTable.operations()).thenReturn(tableOperations); + TableMetadata initialMetadata = mock(TableMetadata.class); + when(tableOperations.current()).thenReturn(initialMetadata); + try (MockedStatic tableMetadataMockedStatic = mockStatic(TableMetadata.class)) { + TableMetadata.Builder mockBuilder = mock(TableMetadata.Builder.class); + tableMetadataMockedStatic + .when(() -> TableMetadata.buildFrom(initialMetadata)) + .thenReturn(mockBuilder); + TableMetadata updatedMetadata = mock(TableMetadata.class); + when(mockBuilder.build()).thenReturn(updatedMetadata); - Table actual = - tableManager.getOrCreateTable(catalogConfig, IDENTIFIER, BASE_PATH, schema, partitionSpec); - assertEquals(mockTable, actual); - verify(mockCatalog).initialize(catalogName, OPTIONS); - verify(mockCatalog, never()).loadTable(any()); + IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION); + + Table actual = + tableManager.getOrCreateTable( + catalogConfig, IDENTIFIER, BASE_PATH, schema, partitionSpec); + assertEquals(loadedTable, actual); + verify(mockCatalog).initialize(catalogName, OPTIONS); + verify(tableOperations).commit(initialMetadata, updatedMetadata); + verify(mockBuilder).setCurrentSchema(schema, schema.highestFieldId()); + verify(mockBuilder).setDefaultPartitionSpec(partitionSpec); + } } @Test @@ -139,13 +164,14 @@ void catalogGetOrCreateWithRaceConditionOnCreation() { Schema schema = new Schema(); PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); when(mockCatalog.createTable( - IDENTIFIER, - schema, - partitionSpec, - BASE_PATH, - Collections.singletonMap( - TableProperties.DEFAULT_NAME_MAPPING, - NameMappingParser.toJson(MappingUtil.create(schema))))) + eq(IDENTIFIER), + any(), + any(), + eq(BASE_PATH), + eq( + Collections.singletonMap( + TableProperties.DEFAULT_NAME_MAPPING, + NameMappingParser.toJson(MappingUtil.create(schema)))))) .thenThrow(new AlreadyExistsException("Table already exists")); when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable); diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 5e37be6c6..b0f09a3e4 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -43,6 +43,12 @@ test-jar test + + org.apache.xtable + xtable-hudi-support-extensions_${scala.binary.version} + ${project.version} + test + org.apache.xtable diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java index 52cff85ad..24aa5adfc 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java @@ -133,7 +133,7 @@ private void validateTargetMetadataIsPresent(String basePath) { Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata")); long icebergMetadataFiles = Files.list(icebergMetadataPath).filter(p -> p.toString().endsWith("metadata.json")).count(); - Assertions.assertEquals(2, icebergMetadataFiles); + Assertions.assertEquals(3, icebergMetadataFiles); Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log")); long deltaMetadataFiles = Files.list(deltaMetadataPath).filter(p -> p.toString().endsWith(".json")).count(); diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java index 2294e16aa..f18ce867f 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java @@ -55,7 +55,7 @@ void testSingleSyncMode(@TempDir Path tempDir) throws IOException { String[] args = new String[] {"--datasetConfig", configFile.getPath()}; RunSync.main(args); Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); - waitForNumIcebergCommits(icebergMetadataPath, 2); + waitForNumIcebergCommits(icebergMetadataPath, 3); } } @@ -64,7 +64,7 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { ExecutorService runner = Executors.newSingleThreadExecutor(); String tableName = "test-table"; try (GenericTable table = - TestJavaHudiTable.forStandardSchema( + TestJavaHudiTable.forStandardSchemaWithFieldIds( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { table.insertRows(20); File configFile = writeConfigFile(tempDir, table, tableName); @@ -78,11 +78,16 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { } }); Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); - waitForNumIcebergCommits(icebergMetadataPath, 2); + waitForNumIcebergCommits(icebergMetadataPath, 3); + } + try (GenericTable table = + TestJavaHudiTable.withAdditionalColumnsAndFieldIds( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { // write more data now that table is initialized and data is synced table.insertRows(20); - waitForNumIcebergCommits(icebergMetadataPath, 3); - assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath)); + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 6); + assertEquals(6, numIcebergMetadataJsonFiles(icebergMetadataPath)); } finally { runner.shutdownNow(); }