Skip to content
Draft
23 changes: 20 additions & 3 deletions mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@
<version>5.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>data-import-processing-core</artifactId>
Expand Down Expand Up @@ -280,7 +286,7 @@
<testcontainers.version>1.18.3</testcontainers.version>
<basedir>${project.parent.basedir}</basedir>
<ramlfiles_path>${project.parent.basedir}/ramls</ramlfiles_path>
<jooq.version>3.16.19</jooq.version>
<jooq.version>3.16.23</jooq.version>
<vertx-jooq.version>6.5.5</vertx-jooq.version>
<aspectj.version>1.9.19</aspectj.version>
<postgres.version>42.7.2</postgres.version>
Expand Down Expand Up @@ -488,12 +494,23 @@
<name>org.jooq.meta.postgres.PostgresDatabase</name>
<includes>.*</includes>
<excludes>
databasechangelog|databasechangeloglock|marc_indexers.*
databasechangelog|databasechangeloglock|marc_indexers_.*
</excludes>
<inputSchema>public</inputSchema>
<outputSchemaToDefault>true</outputSchemaToDefault>
<unsignedTypes>false</unsignedTypes>
<forcedTypes />
<forcedTypes>
<forcedType>
<userType>io.vertx.core.json.JsonObject</userType>
<includeExpression>upsert_marc_record.content</includeExpression>
<converter>io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter</converter>
</forcedType>
<forcedType>
<userType>io.vertx.core.json.JsonObject</userType>
<includeExpression>update_marc_record.new_content</includeExpression>
<converter>io.github.jklingsporn.vertx.jooq.shared.postgres.JSONBToJsonObjectConverter</converter>
</forcedType>
</forcedTypes>
</database>
<generate>
<daos>true</daos>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.github.jklingsporn.vertx.jooq.classic.reactivepg;


import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.Transaction;
import org.jooq.Configuration;
import org.jooq.Query;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;

import java.util.function.Function;

/**
* This class was moved to this package so that we can access the constructor that accepts a transaction. The constructor
* is only accessible from this package.
*/
public class CustomReactiveQueryExecutor extends ReactiveClassicGenericQueryExecutor {
private final String tenantId;
private static final String pattern = "(?<![:=]):(?![:=])";

public CustomReactiveQueryExecutor(Configuration configuration, SqlClient delegate, String tenantId) {
this(configuration, delegate, null, tenantId);
}

CustomReactiveQueryExecutor(Configuration configuration, SqlClient delegate, Transaction transaction, String tenantId) {
super(configuration, delegate, transaction);
this.tenantId = tenantId;
}

/**
* This helps to retain the tenant context for the query executor
*/
public String getTenantId() {
return tenantId;
}

@Override
protected Function<Transaction, ? extends CustomReactiveQueryExecutor> newInstance(SqlClient connection) {
return transaction -> new CustomReactiveQueryExecutor(configuration(), connection, transaction, tenantId);
}

@SuppressWarnings("unchecked")
public <U> Future<U> customTransaction(Function<? super CustomReactiveQueryExecutor, Future<U>> transaction){
return this.transaction((Function<ReactiveClassicGenericQueryExecutor, Future<U>>) transaction);
}

/**
* This method was copied from the super class. The only difference is the pattern used to replace characters
* in the named query. The pattern in the super class did not handle some cases.
*/
@Override
public String toPreparedQuery(Query query) {
if (SQLDialect.POSTGRES.supports(configuration().dialect())) {
String namedQuery = query.getSQL(ParamType.NAMED);
return namedQuery.replaceAll(pattern, "\\$");
}
// mysql works with the standard string
return query.getSQL();
}



/**
* This is a hack to expose the underlying vertx sql client because vertx-jooq does not support batch operations.
*/
public Future<RowSet<Row>> getDelegate(Function<SqlClient, Future<RowSet<Row>>> delegateFunction){
return delegateFunction.apply(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor;
import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.vertx.core.json.JsonObject;
import io.vertx.pgclient.PgConnectOptions;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class PostgresClientFactory {

private final Vertx vertx;

private static Class<? extends ReactiveClassicGenericQueryExecutor> reactiveClassicGenericQueryExecutorProxyClass;
private static Class<? extends CustomReactiveQueryExecutor> reactiveClassicGenericQueryExecutorProxyClass;

@Value("${srs.db.reactive.numRetries:3}")
private Integer numOfRetries;
Expand Down Expand Up @@ -123,13 +124,13 @@ public void close() {
* @param tenantId tenant id
* @return reactive query executor
*/
public ReactiveClassicGenericQueryExecutor getQueryExecutor(String tenantId) {
public CustomReactiveQueryExecutor getQueryExecutor(String tenantId) {
if (reactiveClassicGenericQueryExecutorProxyClass == null) setupProxyExecutorClass();
ReactiveClassicGenericQueryExecutor queryExecutorProxy;
CustomReactiveQueryExecutor queryExecutorProxy;
try {
queryExecutorProxy = reactiveClassicGenericQueryExecutorProxyClass
.getDeclaredConstructor(Configuration.class, SqlClient.class)
.newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate());
.getDeclaredConstructor(Configuration.class, SqlClient.class, String.class)
.newInstance(configuration, getCachedPool(this.vertx, tenantId).getDelegate(), tenantId);
} catch (Exception e) {
throw new RuntimeException("Something happened while creating proxied reactiveClassicGenericQueryExecutor", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.folio.dao;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -56,13 +56,15 @@ public static void setRetryDelay(long delay) {
*
* @return the generated subclass
*/
public static Class<? extends ReactiveClassicGenericQueryExecutor> generateClass() {
public static Class<? extends CustomReactiveQueryExecutor> generateClass() {
return new ByteBuddy()
.subclass(ReactiveClassicGenericQueryExecutor.class)
.subclass(CustomReactiveQueryExecutor.class)
.constructor(ElementMatchers.any()) // Match all constructors
.intercept(SuperMethodCall.INSTANCE) // Call the original constructor
.method(ElementMatchers.named("transaction")) // For transaction method
.intercept(MethodDelegation.to(QueryExecutorInterceptor.class))
.method(ElementMatchers.named("customTransaction")) // For custom transaction method
.intercept(MethodDelegation.to(QueryExecutorInterceptor.class))
.method(ElementMatchers.named("query")) // For query method
.intercept(MethodDelegation.to(QueryExecutorInterceptor.class))
.make()
Expand All @@ -80,7 +82,7 @@ public static Class<? extends ReactiveClassicGenericQueryExecutor> generateClass
public static <U> Future<U> query(
@net.bytebuddy.implementation.bind.annotation.SuperCall Callable<Future<U>> superCall
) {
LOGGER.trace("query method of ReactiveClassicGenericQueryExecutor proxied");
LOGGER.trace("query method of CustomReactiveQueryExecutor proxied");
return retryOf(() -> {
try {
return superCall.call();
Expand All @@ -101,7 +103,28 @@ public static <U> Future<U> query(
public static <U> Future<U> transaction(
@net.bytebuddy.implementation.bind.annotation.SuperCall Callable<Future<U>> superCall
) {
LOGGER.trace("transaction method of ReactiveClassicGenericQueryExecutor proxied");
LOGGER.trace("transaction method of CustomReactiveQueryExecutor proxied");
return retryOf(() -> {
try {
return superCall.call();
} catch (Throwable e) {
LOGGER.error("Something happened while attempting to make proxied call for transaction method", e);
return Future.failedFuture(e);
}
}, numRetries);
}

/**
* Interceptor for the transaction method, with retry functionality.
*
* @param superCall the original method call
* @return the result of the transaction operation
*/
@SuppressWarnings("unused")
public static <U> Future<U> customTransaction(
@net.bytebuddy.implementation.bind.annotation.SuperCall Callable<Future<U>> superCall
) {
LOGGER.trace("customTransaction method of CustomReactiveQueryExecutor proxied");
return retryOf(() -> {
try {
return superCall.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Optional;
import java.util.function.Function;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.CustomReactiveQueryExecutor;
import io.vertx.sqlclient.Row;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordType;
Expand Down Expand Up @@ -194,13 +195,13 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
Future<Record> saveRecord(Record record, String tenantId);

/**
* Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor}
* Saves {@link Record} to the db using {@link CustomReactiveQueryExecutor}
*
* @param txQE query executor
* @param record Record to save
* @return future with saved Record
*/
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record);
Future<Record> saveRecord(CustomReactiveQueryExecutor txQE, Record record);

/**
* Saves {@link RecordCollection} to the db
Expand Down Expand Up @@ -372,7 +373,7 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param oldRecord old Record that has to be marked as "old"
* @return future with new "updated" Record
*/
Future<Record> saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE, Record newRecord, Record oldRecord);
Future<Record> saveUpdatedRecord(CustomReactiveQueryExecutor txQE, Record newRecord, Record oldRecord);

/**
* Change suppress from discovery flag for record by external relation id
Expand All @@ -393,7 +394,7 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param tenantId tenant id
* @return future with generic type
*/
<T> Future<T> executeInTransaction(Function<ReactiveClassicGenericQueryExecutor, Future<T>> action, String tenantId);
<T> Future<T> executeInTransaction(Function<CustomReactiveQueryExecutor, Future<T>> action, String tenantId);

/**
* Search for non-existent mark bib ids in the system
Expand Down
Loading