Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,34 @@ public ClientRegistryClient clientRegistryClient(@Qualifier("customBuilder") Web
}

@Bean
public LockedUsersRepository lockedUsersRepository(DbOptions dbOptions) {
return new LockedUsersRepository(pgPool(dbOptions));
public LockedUsersRepository lockedUsersRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new LockedUsersRepository(pgPool);
}

@Bean
public PgPool pgPool(DbOptions dbOptions) {
@Bean("readWriteClient")
public PgPool readWriteClient(DbOptions dbOptions) {
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(dbOptions.getPort())
.setHost(dbOptions.getHost())
.setDatabase(dbOptions.getSchema())
.setUser(dbOptions.getUser())
.setPassword(dbOptions.getPassword());

PoolOptions poolOptions = new PoolOptions()
.setMaxSize(dbOptions.getPoolSize());
PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getPoolSize());

return PgPool.pool(connectOptions, poolOptions);
}

@Bean("readOnlyClient")
public PgPool readOnlyClient(DbOptions dbOptions) {
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(dbOptions.getReplica().getPort())
.setHost(dbOptions.getReplica().getHost())
.setDatabase(dbOptions.getSchema())
.setUser(dbOptions.getReplica().getUser())
.setPassword(dbOptions.getReplica().getPassword());

PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getReplica().getPoolSize());

return PgPool.pool(connectOptions, poolOptions);
}
Expand Down Expand Up @@ -344,7 +357,6 @@ ReactiveRedisOperations<String, String> stringReactiveRedisOperations(
return new ReactiveRedisTemplate<>(factory, context);
}


@ConditionalOnProperty(value = "consentmanager.cacheMethod", havingValue = "guava", matchIfMissing = true)
@Bean({"cacheForReplayAttack"})
public CacheAdapter<String, LocalDateTime> stringLocalDateTimeCacheAdapter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
public class ConsentConfiguration {

@Bean
public ConsentRequestRepository consentRequestRepository(PgPool pgPool) {
return new ConsentRequestRepository(pgPool);
public ConsentRequestRepository consentRequestRepository(@Qualifier("readWriteClient") PgPool readWriteClient,
@Qualifier("readOnlyClient") PgPool readOnlyClient) {
return new ConsentRequestRepository(readWriteClient, readOnlyClient);
}

@Bean
public ConsentArtefactRepository consentArtefactRepository(PgPool pgPool) {
public ConsentArtefactRepository consentArtefactRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new ConsentArtefactRepository(pgPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -27,24 +27,28 @@
import static in.projecteka.consentmanager.consent.model.ConsentStatus.GRANTED;
import static in.projecteka.library.common.Serializer.from;
import static in.projecteka.library.common.Serializer.to;
import static io.vertx.sqlclient.Tuple.of;
import static reactor.core.publisher.Mono.create;

@AllArgsConstructor
public class ConsentRequestRepository {
private static final Logger logger = LoggerFactory.getLogger(ConsentRequestRepository.class);
private static final String SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS;
private static final String SELECT_CONSENT_REQUEST_BY_ID;
private static final String SELECT_CONSENT_REQUESTS_BY_STATUS;
private static final String SELECT_CONSENT_DETAILS_FOR_PATIENT;
private static final String SELECT_CONSENT_REQUEST_COUNT = "SELECT COUNT(*) FROM consent_request " +
"WHERE LOWER(patient_id) = $1 and status != $3 and (status=$2 OR $2 IS NULL)";
"WHERE LOWER(patient_id) = $1 and status != $3 and (status= $2 OR $2 IS NULL)";
private static final String INSERT_CONSENT_REQUEST_QUERY = "INSERT INTO consent_request " +
"(request_id, patient_id, status, details) VALUES ($1, $2, $3, $4)";
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status=$1, " +
"date_modified=$2 WHERE request_id=$3";
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status = $1, " +
"date_modified= $2 WHERE request_id= $3";
private static final String FAILED_TO_SAVE_CONSENT_REQUEST = "Failed to save consent request";
private static final String UNKNOWN_ERROR_OCCURRED = "Unknown error occurred";
private static final String FAILED_TO_GET_CONSENT_REQUESTS_BY_STATUS = "Failed to get consent requests by status";

private final PgPool dbClient;
private final PgPool readWriteClient;
private final PgPool readOnlyClient;

static {
String s = "SELECT request_id, status, details, date_created, date_modified FROM consent_request " +
Expand All @@ -57,14 +61,10 @@ public class ConsentRequestRepository {
SELECT_CONSENT_REQUESTS_BY_STATUS = s + "status=$1";
}

public ConsentRequestRepository(PgPool dbClient) {
this.dbClient = dbClient;
}

public Mono<Void> insert(RequestedDetail requestedDetail, UUID requestId) {
return Mono.create(monoSink ->
dbClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
.execute(Tuple.of(requestId.toString(),
return create(monoSink ->
readWriteClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
.execute(of(requestId.toString(),
requestedDetail.getPatient().getId(),
ConsentStatus.REQUESTED.name(),
new JsonObject(from(requestedDetail))),
Expand All @@ -82,22 +82,21 @@ public Mono<ListResult<List<ConsentRequestDetail>>> requestsForPatient(String pa
int limit,
int offset,
String status) {
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
.execute(Tuple.of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
.execute(of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
handler -> {
List<ConsentRequestDetail> requestList = getConsentRequestDetails(handler);
dbClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
.execute(Tuple.of(patientId, status, GRANTED.toString()), counter -> {
readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
.execute(of(patientId, status, GRANTED.toString()),
counter -> {
if (handler.failed()) {
logger.error(handler.cause().getMessage(), handler.cause());
monoSink.error(new DbOperationError());
return;
}
Integer count = counter.result().iterator()
.next().getInteger("count");
var count = counter.result().iterator().next().getInteger("count");
monoSink.success(new ListResult<>(requestList, count));
}
);
});
}));
}

Expand All @@ -115,15 +114,13 @@ private List<ConsentRequestDetail> getConsentRequestDetails(AsyncResult<RowSet<R
}

public Mono<ConsentRequestDetail> requestOf(String requestId, String status, String patientId) {
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
.execute(Tuple.of(requestId, status, patientId),
consentRequestHandler(monoSink)));
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
.execute(of(requestId, status, patientId), consentRequestHandler(monoSink)));
}

public Mono<ConsentRequestDetail> requestOf(String requestId) {
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
.execute(Tuple.of(requestId),
consentRequestHandler(monoSink)));
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
.execute(of(requestId), consentRequestHandler(monoSink)));
}

private Handler<AsyncResult<RowSet<Row>>> consentRequestHandler(MonoSink<ConsentRequestDetail> monoSink) {
Expand Down Expand Up @@ -162,8 +159,8 @@ private ConsentRequestDetail mapToConsentRequestDetail(Row result) {
}

public Mono<Void> updateStatus(String id, ConsentStatus status) {
return Mono.create(monoSink -> dbClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
.execute(Tuple.of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
return create(monoSink -> readWriteClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
.execute(of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
updateHandler -> {
if (updateHandler.failed()) {
monoSink.error(new Exception("Failed to update status"));
Expand All @@ -178,8 +175,8 @@ private ConsentStatus getConsentStatus(String status) {
}

public Flux<ConsentRequestDetail> getConsentsByStatus(ConsentStatus status) {
return Flux.create(fluxSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
.execute(Tuple.of(status.toString()),
return Flux.create(fluxSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
.execute(of(status.toString()),
handler -> {
if (handler.failed()) {
logger.error(handler.cause().getMessage(), handler.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public DataFlowRequester dataRequest(@Qualifier("customBuilder") WebClient.Build
}

@Bean
public DataFlowRequestRepository dataRequestRepository(PgPool pgPool) {
public DataFlowRequestRepository dataRequestRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new DataFlowRequestRepository(pgPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
import in.projecteka.consentmanager.clients.DiscoveryServiceClient;
import in.projecteka.consentmanager.clients.LinkServiceClient;
import in.projecteka.consentmanager.clients.UserServiceClient;
import in.projecteka.consentmanager.clients.properties.GatewayServiceProperties;
import in.projecteka.consentmanager.clients.properties.LinkServiceProperties;
import in.projecteka.consentmanager.common.CentralRegistry;
import in.projecteka.consentmanager.common.ServiceAuthentication;
import in.projecteka.consentmanager.common.cache.CacheAdapter;
import in.projecteka.consentmanager.common.cache.LoadingCacheAdapter;
import in.projecteka.consentmanager.common.cache.RedisCacheAdapter;
import in.projecteka.consentmanager.link.discovery.Discovery;
import in.projecteka.consentmanager.link.discovery.DiscoveryRepository;
import in.projecteka.consentmanager.link.hiplink.UserAuthInitAction;
Expand Down Expand Up @@ -36,12 +43,12 @@
public class LinkConfiguration {

@Bean
public DiscoveryRepository discoveryRepository(PgPool pgPool) {
public DiscoveryRepository discoveryRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new DiscoveryRepository(pgPool);
}

@Bean
public LinkRepository linkRepository(PgPool pgPool) {
public LinkRepository linkRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new LinkRepository(pgPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,27 @@ public class DbOptions {
private final String user;
private final String password;
private final int poolSize;
private final boolean replicaReadEnabled;
private final Replica replica;

public Replica getReplica() {
return replica != null && replicaReadEnabled
? replica
: new Replica(host, port, user, password, getReadPoolSize());
}

private int getReadPoolSize() {
return poolSize / 2 + poolSize % 2;
}

public int getPoolSize() {
return replica != null && replicaReadEnabled
? poolSize
: poolSize / 2;
}

public in.projecteka.library.common.DbOptions toHeartBeat() {
return new in.projecteka.library.common.DbOptions(host, port);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,16 @@
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;


@Repository
@AllArgsConstructor
public class OtpAttemptRepository {

private final static Logger logger = LoggerFactory.getLogger(OtpAttemptRepository.class);


private static final String INSERT_OTP_ATTEMPT = "INSERT INTO " +
"otp_attempt (session_id ,cm_id, identifier_type, identifier_value, status, action) VALUES ($1,$2,$3,$4,$5,$6)";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import in.projecteka.consentmanager.user.model.OtpAttempt;
import in.projecteka.library.clients.model.ClientError;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -55,8 +54,8 @@ public Mono<Void> removeMatchingAttempts(OtpAttempt otpAttempt) {

public <T> Mono<T> handleInvalidOTPError(ClientError error, OtpAttempt attempt) {
Mono<T> invalidOTPError = Mono.error(error);
if (error.getErrorCode().equals(OTP_INVALID)) {
return saveOTPAttempt(attempt.toBuilder().attemptStatus(FAILURE).build()).then(invalidOTPError);
if (error.getErrorCode().equals(ErrorCode.OTP_INVALID)) {
return saveOTPAttempt(attempt.toBuilder().attemptStatus(OtpAttempt.AttemptStatus.FAILURE).build()).then(invalidOTPError);
}
return invalidOTPError;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public UserService userService(UserRepository userRepository,
}

@Bean
public UserRepository userRepository(PgPool pgPool) {
public UserRepository userRepository(@Qualifier("readWriteClient") PgPool pgPool) {
return new UserRepository(pgPool);
}

Expand Down Expand Up @@ -145,7 +145,7 @@ public SessionService sessionService(
}

@Bean
public TransactionPinRepository transactionPinRepository(PgPool dbClient) {
public TransactionPinRepository transactionPinRepository(@Qualifier("readWriteClient") PgPool dbClient) {
return new TransactionPinRepository(dbClient);
}

Expand Down Expand Up @@ -173,4 +173,15 @@ public TransactionPinService transactionPinService(TransactionPinRepository tran
public ProfileService profileService(UserService userService, TransactionPinService transactionPinService) {
return new ProfileService(userService, transactionPinService);
}

@Bean
public OtpAttemptRepository otpAttemptRepository(@Qualifier("readWriteClient") PgPool readWriteClient) {
return new OtpAttemptRepository(readWriteClient);
}

@Bean
public OtpAttemptService otpAttemptService(OtpAttemptRepository otpAttemptRepository,
UserServiceProperties userServiceProperties) {
return new OtpAttemptService(otpAttemptRepository, userServiceProperties);
}
}
9 changes: 8 additions & 1 deletion consent/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ consentmanager:
schema: ${CONSENT_MANAGER_DB_NAME}
user: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
poolSize: 5
poolSize: ${MASTER_POOL_SIZE:5}
replica-read-enabled: {REPLICA_READ_ENABLED:false}
replica:
host: ${POSTGRES_HOST}
port: ${POSTGRES_PORT:5432}
user: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
poolSize: ${REPLICA_POOL_SIZE:3}
dataflow:
consentmanager:
url: ${CONSENT_MANAGER_URL}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/in/projecteka/consentmanager/Replica.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package in.projecteka.consentmanager;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public class Replica {
private final String host;
private final int port;
private final String user;
private final String password;
private final int poolSize;
}