Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -627,22 +627,17 @@ public void deletePrincipalSecretsInCurrentTxn(
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
return storageIntegrationProvider.getStorageIntegrationForConfig(
polarisStorageConfigurationInfo);
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog) {
return storageIntegrationProvider.getStorageIntegration(catalog);
}

/** {@inheritDoc} */
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(getDiagnostics(), entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
return BaseMetaStoreManager.toStorageIntegration(
getDiagnostics(), entity, storageIntegrationProvider);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,12 +1099,8 @@ private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(
@Override
public <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegration(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
return storageIntegrationProvider.getStorageIntegrationForConfig(
polarisStorageConfigurationInfo);
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog) {
return storageIntegrationProvider.getStorageIntegration(catalog);
}

@Override
Expand All @@ -1118,9 +1114,8 @@ public <T extends PolarisStorageConfigurationInfo> void persistStorageIntegratio
public <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> loadPolarisStorageIntegration(
@Nonnull PolarisCallContext callContext, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(callContext.getDiagServices(), entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
return BaseMetaStoreManager.toStorageIntegration(
callContext.getDiagServices(), entity, storageIntegrationProvider);
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,7 @@ private void revokeGrantRecord(
// storageConfigInfo's presence is needed to create a storage integration
// and the catalog should not have an internal property of storage identifier or id yet
if (storageConfigInfoStr != null && integrationIdentifierOrId == null) {
integration =
((IntegrationPersistence) ms)
.createStorageIntegration(
callCtx,
catalog.getCatalogId(),
catalog.getId(),
PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr));
integration = ((IntegrationPersistence) ms).createStorageIntegration(callCtx, catalog);
} else {
integration = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.polaris.core.persistence;

import jakarta.annotation.Nonnull;
import java.util.Map;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisBaseEntity;
Expand All @@ -28,23 +27,26 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;

/** Shared basic PolarisMetaStoreManager logic for transactional and non-transactional impls. */
public abstract class BaseMetaStoreManager implements PolarisMetaStoreManager {

public static PolarisStorageConfigurationInfo extractStorageConfiguration(
@Nonnull PolarisDiagnostics diagnostics, PolarisBaseEntity reloadedEntity) {
Map<String, String> propMap = reloadedEntity.getInternalPropertiesAsMap();
String storageConfigInfoStr =
propMap.get(PolarisEntityConstants.getStorageConfigInfoPropertyName());

public static <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> toStorageIntegration(
@Nonnull PolarisDiagnostics diagnostics,
PolarisBaseEntity reloadedEntity,
PolarisStorageIntegrationProvider storageIntegrationProvider) {
PolarisStorageIntegration<T> storageIntegration =
storageIntegrationProvider.getStorageIntegration(reloadedEntity);
diagnostics.check(
storageConfigInfoStr != null,
storageIntegration != null,
"missing_storage_configuration_info",
"catalogId={}, entityId={}",
reloadedEntity.getCatalogId(),
reloadedEntity.getId());
return PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr);
return storageIntegration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,13 @@ void deletePrincipalSecrets(
/**
* Create an in-memory storage integration
*
* @param callCtx the polaris calllctx
* @param catalogId the catalog id
* @param entityId the entity id
* @param polarisStorageConfigurationInfo the storage configuration information
* @param callCtx the polaris call context
* @param catalog the catalog entity, for which a storage integration object needs to be created
* @return a storage integration object
*/
@Nullable
<T extends PolarisStorageConfigurationInfo> PolarisStorageIntegration<T> createStorageIntegration(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo);
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog);

/**
* Persist a storage integration in the metastore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,9 @@ public void deletePrincipalSecrets(
@Nullable
public <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegration(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog) {
return runInTransaction(
callCtx,
() ->
this.createStorageIntegrationInCurrentTxn(
callCtx, catalogId, entityId, polarisStorageConfigurationInfo));
callCtx, () -> this.createStorageIntegrationInCurrentTxn(callCtx, catalog));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,12 +981,7 @@ private void bootstrapPolarisService(
// storageConfigInfo's presence is needed to create a storage integration
// and the catalog should not have an internal property of storage identifier or id yet
if (storageConfigInfoStr != null && integrationIdentifierOrId == null) {
integration =
ms.createStorageIntegrationInCurrentTxn(
callCtx,
catalog.getCatalogId(),
catalog.getId(),
PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr));
integration = ms.createStorageIntegrationInCurrentTxn(callCtx, catalog);
} else {
integration = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,7 @@ void deletePrincipalSecretsInCurrentTxn(
@Nullable
<T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo);
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog);

/**
* See {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,22 +518,17 @@ public void deletePrincipalSecretsInCurrentTxn(
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> createStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long entityId,
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
return storageIntegrationProvider.getStorageIntegrationForConfig(
polarisStorageConfigurationInfo);
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity catalog) {
return storageIntegrationProvider.getStorageIntegration(catalog);
}

/** {@inheritDoc} */
@Override
public @Nullable <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> loadPolarisStorageIntegrationInCurrentTxn(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
BaseMetaStoreManager.extractStorageConfiguration(getDiagnostics(), entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
return BaseMetaStoreManager.toStorageIntegration(
getDiagnostics(), entity, storageIntegrationProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.polaris.core.storage;

import jakarta.annotation.Nullable;
import org.apache.polaris.core.entity.PolarisBaseEntity;

/**
* Factory interface that knows how to construct a {@link PolarisStorageIntegration} given a {@link
* PolarisStorageConfigurationInfo}.
*/
public interface PolarisStorageIntegrationProvider {
<T extends PolarisStorageConfigurationInfo>
@Nullable PolarisStorageIntegration<T> getStorageIntegrationForConfig(
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo);
@Nullable PolarisStorageIntegration<T> getStorageIntegration(PolarisBaseEntity entity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
Expand Down Expand Up @@ -64,8 +65,7 @@ public PolarisStorageIntegrationProvider storageIntegrationProvider() {
@Override
@Nullable
public <T extends PolarisStorageConfigurationInfo>
PolarisStorageIntegration<T> getStorageIntegrationForConfig(
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
PolarisStorageIntegration<T> getStorageIntegration(PolarisBaseEntity entity) {
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
Expand Down Expand Up @@ -72,14 +74,21 @@ public PolarisStorageIntegrationProviderImpl(
this.gcpCredsProvider = gcpCredsProvider;
}

@Override
@SuppressWarnings("unchecked")
@Override
@Nullable
public <T extends PolarisStorageConfigurationInfo>
@Nullable PolarisStorageIntegration<T> getStorageIntegrationForConfig(
PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) {
if (polarisStorageConfigurationInfo == null) {
PolarisStorageIntegration<T> getStorageIntegration(PolarisBaseEntity entity) {
Map<String, String> propMap = entity.getInternalPropertiesAsMap();
String storageConfigInfoStr =
propMap.get(PolarisEntityConstants.getStorageConfigInfoPropertyName());
if (storageConfigInfoStr == null) {
return null;
}

PolarisStorageConfigurationInfo polarisStorageConfigurationInfo =
PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr);

PolarisStorageIntegration<T> storageIntegration;
switch (polarisStorageConfigurationInfo.getStorageType()) {
case S3:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,7 @@ public void before(TestInfo testInfo) {
(AwsStorageConfigurationInfo)
CatalogEntity.of(catalogEntity).getStorageConfigurationInfo(),
stsClient);
when(storageIntegrationProvider.getStorageIntegrationForConfig(
isA(AwsStorageConfigurationInfo.class)))
when(storageIntegrationProvider.getStorageIntegration(any()))
.thenReturn((PolarisStorageIntegration) storageIntegration);

this.catalog = initCatalog("my-catalog", ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.service.catalog;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -226,8 +227,7 @@ public void before(TestInfo testInfo) {
(AwsStorageConfigurationInfo)
CatalogEntity.of(catalogEntity).getStorageConfigurationInfo(),
stsClient);
when(storageIntegrationProvider.getStorageIntegrationForConfig(
isA(AwsStorageConfigurationInfo.class)))
when(storageIntegrationProvider.getStorageIntegration(any()))
.thenReturn((PolarisStorageIntegration) storageIntegration);

this.genericTableCatalog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -245,8 +246,7 @@ public void before(TestInfo testInfo) {
(AwsStorageConfigurationInfo)
CatalogEntity.of(catalogEntity).getStorageConfigurationInfo(),
stsClient);
when(storageIntegrationProvider.getStorageIntegrationForConfig(
isA(AwsStorageConfigurationInfo.class)))
when(storageIntegrationProvider.getStorageIntegration(any()))
.thenReturn((PolarisStorageIntegration) storageIntegration);

this.policyCatalog = new PolicyCatalog(metaStoreManager, polarisContext, passthroughView);
Expand Down