Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate;
import org.hypertrace.core.documentstore.postgres.internal.BulkUpdateSubDocsInternalResult;
import org.hypertrace.core.documentstore.postgres.model.DocumentAndId;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
import org.hypertrace.core.documentstore.postgres.subdoc.PostgresSubDocumentUpdater;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
import org.postgresql.util.PSQLException;
Expand All @@ -101,19 +102,36 @@ public class PostgresCollection implements Collection {

private final PostgresClient client;
private final PostgresTableIdentifier tableIdentifier;
private final PostgresColumnRegistry columnRegistry;
private final PostgresSubDocumentUpdater subDocUpdater;
private final PostgresQueryExecutor queryExecutor;
private final UpdateValidator updateValidator;

public PostgresCollection(final PostgresClient client, final String collectionName) {
this(client, PostgresTableIdentifier.parse(collectionName));
this(client, PostgresTableIdentifier.parse(collectionName), null);
}

public PostgresCollection(
final PostgresClient client,
final String collectionName,
final PostgresColumnRegistry columnRegistry) {
this(client, PostgresTableIdentifier.parse(collectionName), columnRegistry);
}

PostgresCollection(final PostgresClient client, final PostgresTableIdentifier tableIdentifier) {
this(client, tableIdentifier, null);
}

PostgresCollection(
final PostgresClient client,
final PostgresTableIdentifier tableIdentifier,
final PostgresColumnRegistry columnRegistry) {
this.client = client;
this.tableIdentifier = tableIdentifier;
this.columnRegistry = columnRegistry;
this.subDocUpdater =
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
new PostgresSubDocumentUpdater(
new PostgresQueryBuilder(this.tableIdentifier, this.columnRegistry));
this.queryExecutor = new PostgresQueryExecutor(this.tableIdentifier);
this.updateValidator = new CommonUpdateValidator();
}
Expand Down Expand Up @@ -488,15 +506,16 @@ private CloseableIterator<Document> search(Query query, boolean removeDocumentId
@Override
public CloseableIterator<Document> find(
final org.hypertrace.core.documentstore.query.Query query) {
return queryExecutor.execute(client.getConnection(), query);
return queryExecutor.execute(client.getConnection(), query, null, columnRegistry);
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
String flatStructureCollectionName =
client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY);
return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName);
return queryExecutor.execute(
client.getConnection(), query, flatStructureCollectionName, columnRegistry);
}

@Override
Expand All @@ -510,7 +529,7 @@ public Optional<Document> update(
try (final Connection connection = client.getPooledConnection()) {
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
tableIdentifier, query);
tableIdentifier, query, columnRegistry);
final String selectQuery = parser.buildSelectQueryForUpdate();

try (final PreparedStatement preparedStatement =
Expand Down Expand Up @@ -545,7 +564,7 @@ public Optional<Document> update(
.build();

try (final CloseableIterator<Document> iterator =
queryExecutor.execute(connection, findByIdQuery)) {
queryExecutor.execute(connection, findByIdQuery, null, columnRegistry)) {
returnDocument = getFirstDocument(iterator).orElseThrow();
}
} else if (updateOptions.getReturnDocumentType() == NONE) {
Expand Down Expand Up @@ -1568,4 +1587,32 @@ private static Optional<JsonNode> mapValueToJsonNode(int columnType, String colu
return Optional.empty();
}
}

/**
* Iterator that extracts clean documents from the 'document' column for queries without
* selections, removing the wrapper structure.
*/
static class PostgresResultIteratorCleanDocument extends PostgresResultIterator {

public PostgresResultIteratorCleanDocument(ResultSet resultSet) {
super(resultSet, true);
}

@Override
protected Document prepareDocument() throws SQLException, IOException {
// For queries without selections, extract the document content directly
String documentString = resultSet.getString(DOCUMENT);
if (documentString != null) {
ObjectNode jsonNode = (ObjectNode) MAPPER.readTree(documentString);
// Remove document ID if needed
if (shouldRemoveDocumentId()) {
jsonNode.remove(DOCUMENT_ID);
}
return new JSONDocument(MAPPER.writeValueAsString(jsonNode));
}

// Fallback to empty document if no document column
return new JSONDocument("{}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.Collection;
Expand All @@ -24,6 +25,9 @@
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistryImpl;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +41,9 @@ public class PostgresDatastore implements Datastore {
private final String database;
private final DocStoreMetricProvider docStoreMetricProvider;

// Cache for PostgreSQL column registries per table
private final Map<String, PostgresColumnRegistry> registryCache = new ConcurrentHashMap<>();

public PostgresDatastore(@NonNull final DatastoreConfig datastoreConfig) {
final ConnectionConfig connectionConfig = datastoreConfig.connectionConfig();

Expand Down Expand Up @@ -148,7 +155,81 @@ public Collection getCollection(String collectionName) {
if (!tables.contains(collectionName)) {
createCollection(collectionName, null);
}
return new PostgresCollection(client, collectionName);

// Create or get cached registry for this collection
PostgresColumnRegistry registry = createOrGetRegistry(collectionName);

return new PostgresCollection(client, collectionName, registry);
}

/**
* Creates or retrieves a cached PostgresColumnRegistry for the specified collection. The registry
* is cached to avoid repeated database schema queries.
*
* @param collectionName the collection name to create/get registry for
* @return the PostgresColumnRegistry for the collection
*/
private PostgresColumnRegistry createOrGetRegistry(String collectionName) {
return registryCache.computeIfAbsent(
collectionName,
tableName -> {
try {
PostgresColumnRegistry registry =
new PostgresColumnRegistryImpl(client.getConnection(), tableName);

LOGGER.debug(
"Created PostgresColumnRegistry for collection '{}' with {} first-class columns",
tableName,
registry.getAllFirstClassColumns().size());

return registry;
} catch (SQLException e) {
LOGGER.warn(
"Failed to create PostgresColumnRegistry for collection '{}': {}. "
+ "Falling back to JSONB-only behavior.",
tableName,
e.getMessage());

// Return an empty registry that treats all fields as JSONB
return createEmptyRegistry(tableName);
}
});
}

/**
* Creates an empty registry that treats all fields as JSONB fields. This is used as a fallback
* when registry creation fails.
*
* @param tableName the table name
* @return an empty registry
*/
private PostgresColumnRegistry createEmptyRegistry(String tableName) {
return new PostgresColumnRegistry() {
@Override
public boolean isFirstClassColumn(String fieldName) {
return false; // All fields are treated as JSONB
}

@Override
public Optional<PostgresColumnType> getColumnType(String fieldName) {
return Optional.empty();
}

@Override
public Optional<String> getColumnName(String fieldName) {
return Optional.empty();
}

@Override
public Set<String> getAllFirstClassColumns() {
return Set.of(); // No first-class columns
}

@Override
public String getTableName() {
return tableName;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hypertrace.core.documentstore.model.subdoc.UpdateOperator;
import org.hypertrace.core.documentstore.postgres.Params.Builder;
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddToListIfAbsentParser;
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAddValueParser;
import org.hypertrace.core.documentstore.postgres.update.parser.PostgresAppendToListParser;
Expand All @@ -44,12 +45,14 @@ public class PostgresQueryBuilder {
entry(APPEND_TO_LIST, new PostgresAppendToListParser()));

@Getter private final PostgresTableIdentifier tableIdentifier;
private final PostgresColumnRegistry columnRegistry;

public String getSubDocUpdateQuery(
final Query query,
final Collection<SubDocumentUpdate> updates,
final Params.Builder paramBuilder) {
final PostgresQueryParser baseQueryParser = new PostgresQueryParser(tableIdentifier, query);
final PostgresQueryParser baseQueryParser =
new PostgresQueryParser(tableIdentifier, query, columnRegistry);
String selectQuery =
String.format(
"(SELECT %s, %s FROM %s AS t0 %s)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.CloseableIterator;
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.postgres.PostgresCollection.PostgresResultIterator;
import org.hypertrace.core.documentstore.postgres.PostgresCollection.PostgresResultIteratorWithMetaData;
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.PostgresQueryTransformer;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
import org.hypertrace.core.documentstore.query.Query;

@Slf4j
Expand All @@ -21,27 +21,38 @@ public class PostgresQueryExecutor {
private final PostgresTableIdentifier tableIdentifier;

public CloseableIterator<Document> execute(final Connection connection, final Query query) {
return execute(connection, query, null);
return execute(connection, query, null, null);
}

public CloseableIterator<Document> execute(
final Connection connection, final Query query, String flatStructureCollectionName) {
return execute(connection, query, flatStructureCollectionName, null);
}

public CloseableIterator<Document> execute(
final Connection connection,
final Query query,
String flatStructureCollectionName,
PostgresColumnRegistry columnRegistry) {
final org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
tableIdentifier, transformAndLog(query), flatStructureCollectionName);
tableIdentifier, transformAndLog(query), flatStructureCollectionName, columnRegistry);
final String sqlQuery = queryParser.parse();
try {
final PreparedStatement preparedStatement =
buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build(), connection);
log.debug("Executing executeQueryV1 sqlQuery:{}", preparedStatement.toString());
final ResultSet resultSet = preparedStatement.executeQuery();

if ((tableIdentifier.getTableName().equals(flatStructureCollectionName))) {
return new PostgresCollection.PostgresResultIteratorWithBasicTypes(resultSet);
// For queries with selections, use PostgresResultIteratorWithMetaData
// as it properly handles nested field decoding
if (query.getSelections().size() > 0) {
return new PostgresResultIteratorWithMetaData(resultSet);
}
return query.getSelections().size() > 0
? new PostgresResultIteratorWithMetaData(resultSet)
: new PostgresResultIterator(resultSet);

// For queries without selections, use a custom iterator that extracts clean documents
// from the 'document' column instead of returning wrapped results
return new PostgresCollection.PostgresResultIteratorCleanDocument(resultSet);
} catch (SQLException e) {
log.error(
"SQLException querying documents. original query: " + query + ", sql query:" + sqlQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresSelectTypeExpressionVisitor;
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresSortTypeExpressionVisitor;
import org.hypertrace.core.documentstore.postgres.query.v1.vistors.PostgresUnnestFilterTypeExpressionVisitor;
import org.hypertrace.core.documentstore.postgres.registry.PostgresColumnRegistry;
import org.hypertrace.core.documentstore.query.Pagination;
import org.hypertrace.core.documentstore.query.Query;

Expand All @@ -29,6 +30,7 @@ public class PostgresQueryParser {
@Getter private final PostgresTableIdentifier tableIdentifier;
@Getter private final Query query;
@Getter private final String flatStructureCollectionName;
@Getter private final PostgresColumnRegistry columnRegistry;

@Setter String finalTableName;
@Getter private final Builder paramsBuilder = Params.newBuilder();
Expand All @@ -47,15 +49,29 @@ public class PostgresQueryParser {

public PostgresQueryParser(
PostgresTableIdentifier tableIdentifier, Query query, String flatStructureCollectionName) {
this(tableIdentifier, query, flatStructureCollectionName, null);
}

public PostgresQueryParser(
PostgresTableIdentifier tableIdentifier,
Query query,
String flatStructureCollectionName,
PostgresColumnRegistry columnRegistry) {
this.tableIdentifier = tableIdentifier;
this.query = query;
this.flatStructureCollectionName = flatStructureCollectionName;
this.columnRegistry = columnRegistry;
this.finalTableName = tableIdentifier.toString();
toPgColumnTransformer = new FieldToPgColumnTransformer(this);
}

public PostgresQueryParser(PostgresTableIdentifier tableIdentifier, Query query) {
this(tableIdentifier, query, null);
this(tableIdentifier, query, null, null);
}

public PostgresQueryParser(
PostgresTableIdentifier tableIdentifier, Query query, PostgresColumnRegistry columnRegistry) {
this(tableIdentifier, query, null, columnRegistry);
}

public String parse() {
Expand Down
Loading