Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,22 @@ private void initializeTableIfRequired(InternalTable internalTable) {
@Override
public void syncSchema(InternalSchema schema) {
Schema latestSchema = schemaExtractor.toIceberg(schema);
schemaSync.sync(transaction.table().schema(), latestSchema, transaction);
if (!transaction.table().schema().sameSchema(latestSchema)) {
boolean hasFieldIds =
schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != null);
if (hasFieldIds) {
// There is no clean way to sync the schema with the provided field IDs using the
// transaction API so we commit the current transaction and interact directly with
// the operations API.
transaction.commitTransaction();
schemaSync.syncWithProvidedIds(latestSchema, table);
// Start a new transaction for remaining operations
table.refresh();
transaction = table.newTransaction();
} else {
schemaSync.sync(transaction.table().schema(), latestSchema, transaction);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -58,6 +61,14 @@ public void sync(Schema current, Schema latest, Transaction transaction) {
}
}

public void syncWithProvidedIds(Schema latest, Table table) {
BaseTable baseTable = ((BaseTable) table);
TableMetadata current = baseTable.operations().current();
TableMetadata updated =
TableMetadata.buildFrom(current).setCurrentSchema(latest, latest.highestFieldId()).build();
baseTable.operations().commit(current, updated);
}

/**
* Return a mapping of fieldId in the latest schema to an update action to perform. This allows
* updates to happen in the same order as the source system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -75,24 +77,34 @@ Table getOrCreateTable(
return getTable(catalogConfig, tableIdentifier, basePath);
} else {
try {
return getCatalog(catalogConfig)
.map(
catalog ->
catalog.createTable(
tableIdentifier,
schema,
partitionSpec,
basePath,
getDefaultMappingProperties(schema)))
.orElseGet(
() ->
getHadoopTables()
.create(
schema,
partitionSpec,
SortOrder.unsorted(),
getDefaultMappingProperties(schema),
basePath));
// initialize the table with an empty schema, then manually set the schema to prevent the
// Iceberg API from remapping the field IDs.
Table tableWithEmptySchema =
getCatalog(catalogConfig)
.map(
catalog ->
catalog.createTable(
tableIdentifier,
new Schema(),
PartitionSpec.unpartitioned(),
basePath,
getDefaultMappingProperties(schema)))
.orElseGet(
() ->
getHadoopTables()
.create(
new Schema(),
PartitionSpec.unpartitioned(),
getDefaultMappingProperties(schema),
basePath));
// set the schema with the provided field IDs
TableOperations operations = ((BaseTable) tableWithEmptySchema).operations();
TableMetadata tableMetadata = operations.current();
TableMetadata.Builder builder = TableMetadata.buildFrom(tableMetadata);
builder.setCurrentSchema(schema, schema.highestFieldId());
builder.setDefaultPartitionSpec(partitionSpec);
operations.commit(tableMetadata, builder.build());
return getTable(catalogConfig, tableIdentifier, basePath);
} catch (AlreadyExistsException ex) {
log.info("Table {} not created since it already exists", tableIdentifier);
return getTable(catalogConfig, tableIdentifier, basePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,10 @@ protected static Schema addTopLevelField(Schema schema) {

@SneakyThrows
protected HoodieTableMetaClient getMetaClient(
TypedProperties keyGenProperties, HoodieTableType hoodieTableType, Configuration conf) {
TypedProperties keyGenProperties,
HoodieTableType hoodieTableType,
Configuration conf,
boolean populateMetaFields) {
LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf);
// Enforce checksum such that fs.open() is consistent to DFS
fs.setVerifyChecksum(true);
Expand All @@ -614,6 +617,7 @@ protected HoodieTableMetaClient getMetaClient(
.setPayloadClass(OverwriteWithLatestAvroPayload.class)
.setCommitTimezone(HoodieTimelineTimeZone.UTC)
.setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
.setPopulateMetaFields(populateMetaFields)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(conf, this.basePath, properties);
}
Expand Down
56 changes: 49 additions & 7 deletions xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;

private final Configuration conf;
private final boolean addFieldIds;

/**
* Create a test table instance for general testing. The table is created with the schema defined
Expand All @@ -83,7 +84,13 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
public static TestJavaHudiTable forStandardSchema(
String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
return new TestJavaHudiTable(
tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null);
tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, false);
}

public static TestJavaHudiTable forStandardSchemaWithFieldIds(
String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
return new TestJavaHudiTable(
tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null, true);
}

public static TestJavaHudiTable forStandardSchema(
Expand All @@ -93,7 +100,7 @@ public static TestJavaHudiTable forStandardSchema(
HoodieTableType tableType,
HoodieArchivalConfig archivalConfig) {
return new TestJavaHudiTable(
tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig);
tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, archivalConfig, false);
}

/**
Expand All @@ -119,7 +126,20 @@ public static TestJavaHudiTable withAdditionalColumns(
tempDir,
partitionConfig,
tableType,
null);
null,
false);
}

public static TestJavaHudiTable withAdditionalColumnsAndFieldIds(
String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
return new TestJavaHudiTable(
tableName,
addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
tempDir,
partitionConfig,
tableType,
null,
true);
}

public static TestJavaHudiTable withAdditionalTopLevelField(
Expand All @@ -129,7 +149,13 @@ public static TestJavaHudiTable withAdditionalTopLevelField(
HoodieTableType tableType,
Schema previousSchema) {
return new TestJavaHudiTable(
tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null);
tableName,
addTopLevelField(previousSchema),
tempDir,
partitionConfig,
tableType,
null,
false);
}

public static TestJavaHudiTable withSchema(
Expand All @@ -138,7 +164,8 @@ public static TestJavaHudiTable withSchema(
String partitionConfig,
HoodieTableType tableType,
Schema schema) {
return new TestJavaHudiTable(tableName, schema, tempDir, partitionConfig, tableType, null);
return new TestJavaHudiTable(
tableName, schema, tempDir, partitionConfig, tableType, null, false);
}

private TestJavaHudiTable(
Expand All @@ -147,10 +174,12 @@ private TestJavaHudiTable(
Path tempDir,
String partitionConfig,
HoodieTableType hoodieTableType,
HoodieArchivalConfig archivalConfig) {
HoodieArchivalConfig archivalConfig,
boolean addFieldIds) {
super(name, schema, tempDir, partitionConfig);
this.conf = new Configuration();
this.conf.set("parquet.avro.write-old-list-structure", "false");
this.addFieldIds = addFieldIds;
try {
this.metaClient = initMetaClient(hoodieTableType, typedProperties);
} catch (IOException ex) {
Expand Down Expand Up @@ -297,13 +326,14 @@ private List<HoodieRecord<HoodieAvroPayload>> copyRecords(

private HoodieTableMetaClient initMetaClient(
HoodieTableType hoodieTableType, TypedProperties keyGenProperties) throws IOException {
return getMetaClient(keyGenProperties, hoodieTableType, conf);
return getMetaClient(keyGenProperties, hoodieTableType, conf, !addFieldIds);
}

private HoodieJavaWriteClient<HoodieAvroPayload> initJavaWriteClient(
Schema schema, TypedProperties keyGenProperties, HoodieArchivalConfig archivalConfig) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder()
.withPopulateMetaFields(!addFieldIds)
.withProperties(generateWriteConfig(schema, keyGenProperties).getProps())
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
Expand All @@ -321,6 +351,18 @@ private HoodieJavaWriteClient<HoodieAvroPayload> initJavaWriteClient(
.withArchivalConfig(archivalConfig)
.build();
}
if (addFieldIds) {
writeConfig
.getProps()
.put(
"hoodie.avro.write.support.class",
"org.apache.xtable.hudi.extensions.HoodieAvroWriteSupportWithFieldIds");
writeConfig
.getProps()
.put(
"hoodie.client.init.callback.classes",
"org.apache.xtable.hudi.extensions.AddFieldIdsClientInitCallback");
}
HoodieEngineContext context = new HoodieJavaEngineContext(conf);
return new HoodieJavaWriteClient<>(context, writeConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,6 @@ private SparkRDDWriteClient<HoodieAvroPayload> initSparkWriteClient(

private HoodieTableMetaClient initMetaClient(
JavaSparkContext jsc, HoodieTableType hoodieTableType, TypedProperties keyGenProperties) {
return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration());
return getMetaClient(keyGenProperties, hoodieTableType, jsc.hadoopConfiguration(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private DataFile generateTestDataFile(int partition, Table table, String filePat
.schema(csSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(csPartitionSpec)
.withSpec(table.spec())
.withPartition(partitionInfo)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.xtable.iceberg;

import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -29,9 +30,12 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -335,6 +339,25 @@ void testAddMapFieldComment() {
verify(mockUpdateSchema).commit();
}

@Test
void testSyncWithProvidedIds() {
BaseTable mockBaseTable = Mockito.mock(BaseTable.class, RETURNS_DEEP_STUBS);
TableMetadata mockCurrent = Mockito.mock(TableMetadata.class);
when(mockBaseTable.operations().current()).thenReturn(mockCurrent);
try (MockedStatic<TableMetadata> tableMetadataMockedStatic =
Mockito.mockStatic(TableMetadata.class)) {
TableMetadata.Builder mockBuilder = Mockito.mock(TableMetadata.Builder.class);
tableMetadataMockedStatic
.when(() -> TableMetadata.buildFrom(mockCurrent))
.thenReturn(mockBuilder);
when(mockBuilder.setCurrentSchema(SCHEMA, SCHEMA.highestFieldId())).thenReturn(mockBuilder);
TableMetadata mockUpdated = Mockito.mock(TableMetadata.class);
when(mockBuilder.build()).thenReturn(mockUpdated);
schemaSync.syncWithProvidedIds(SCHEMA, mockBaseTable);
verify(mockBaseTable.operations()).commit(mockCurrent, mockUpdated);
}
}

private Schema addColumnToDefault(Schema schema, Types.NestedField field, Integer parentId) {
List<Types.NestedField> fields = new ArrayList<>();
for (Types.NestedField existingField : schema.columns()) {
Expand Down
Loading