Skip to content

Update #1642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 5.0
Choose a base branch
from
Draft

Update #1642

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,14 @@
*/
package org.neo4j.driver.internal.adaptedbolt;

import java.time.Duration;
import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.neo4j.bolt.connection.AccessMode;
import org.neo4j.bolt.connection.AuthInfo;
import org.neo4j.bolt.connection.AuthTokens;
import org.neo4j.bolt.connection.BoltConnection;
import org.neo4j.bolt.connection.BoltConnectionState;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.DatabaseName;
import org.neo4j.bolt.connection.NotificationConfig;
import org.neo4j.bolt.connection.TelemetryApi;
import org.neo4j.bolt.connection.TransactionType;
import org.neo4j.driver.Value;
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.driver.internal.value.BoltValueFactory;

final class AdaptingDriverBoltConnection implements DriverBoltConnection {
Expand All @@ -49,141 +39,15 @@ final class AdaptingDriverBoltConnection implements DriverBoltConnection {
}

@Override
public <T> CompletionStage<T> onLoop(Supplier<T> supplier) {
return connection.onLoop(supplier);
}

@Override
public CompletionStage<DriverBoltConnection> route(
DatabaseName databaseName, String impersonatedUser, Set<String> bookmarks) {
return connection
.route(databaseName, impersonatedUser, bookmarks)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> beginTransaction(
DatabaseName databaseName,
AccessMode accessMode,
String impersonatedUser,
Set<String> bookmarks,
TransactionType transactionType,
Duration txTimeout,
Map<String, Value> txMetadata,
String txType,
NotificationConfig notificationConfig) {
return connection
.beginTransaction(
databaseName,
accessMode,
impersonatedUser,
bookmarks,
transactionType,
txTimeout,
boltValueFactory.toBoltMap(txMetadata),
txType,
notificationConfig)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> runInAutoCommitTransaction(
DatabaseName databaseName,
AccessMode accessMode,
String impersonatedUser,
Set<String> bookmarks,
String query,
Map<String, Value> parameters,
Duration txTimeout,
Map<String, Value> txMetadata,
NotificationConfig notificationConfig) {
return connection
.runInAutoCommitTransaction(
databaseName,
accessMode,
impersonatedUser,
bookmarks,
query,
boltValueFactory.toBoltMap(parameters),
txTimeout,
boltValueFactory.toBoltMap(txMetadata),
notificationConfig)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> run(String query, Map<String, Value> parameters) {
return connection
.run(query, boltValueFactory.toBoltMap(parameters))
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> pull(long qid, long request) {
return connection
.pull(qid, request)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> discard(long qid, long number) {
public CompletionStage<Void> writeAndFlush(DriverResponseHandler handler, List<Message> messages) {
return connection
.discard(qid, number)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> commit() {
return connection.commit().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> rollback() {
return connection.rollback().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> reset() {
return connection.reset().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> logoff() {
return connection.logoff().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> logon(Map<String, Value> authMap) {
return connection
.logon(AuthTokens.custom(boltValueFactory.toBoltMap(authMap)))
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> telemetry(TelemetryApi telemetryApi) {
return connection
.telemetry(telemetryApi)
.exceptionally(errorMapper::mapAndTrow)
.thenApply(ignored -> this);
}

@Override
public CompletionStage<DriverBoltConnection> clear() {
return connection.clear().exceptionally(errorMapper::mapAndTrow).thenApply(ignored -> this);
.writeAndFlush(new AdaptingDriverResponseHandler(handler, errorMapper, boltValueFactory), messages)
.exceptionally(errorMapper::mapAndTrow);
}

@Override
public CompletionStage<Void> flush(DriverResponseHandler handler) {
return connection
.flush(new AdaptingDriverResponseHandler(handler, errorMapper, boltValueFactory))
.exceptionally(errorMapper::mapAndTrow);
public CompletionStage<Void> write(List<Message> messages) {
return connection.write(messages).exceptionally(errorMapper::mapAndTrow);
}

@Override
Expand All @@ -196,11 +60,6 @@ public CompletionStage<Void> close() {
return connection.close().exceptionally(errorMapper::mapAndTrow);
}

@Override
public BoltConnectionState state() {
return connection.state();
}

@Override
public CompletionStage<AuthInfo> authData() {
return connection.authInfo().exceptionally(errorMapper::mapAndTrow);
Expand Down Expand Up @@ -230,4 +89,9 @@ public boolean telemetrySupported() {
public boolean serverSideRoutingEnabled() {
return connection.serverSideRoutingEnabled();
}

@Override
public BoltValueFactory valueFactory() {
return boltValueFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,29 @@
*/
package org.neo4j.driver.internal.adaptedbolt;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.neo4j.bolt.connection.AccessMode;
import org.neo4j.bolt.connection.AuthInfo;
import org.neo4j.bolt.connection.BoltConnectionState;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.DatabaseName;
import org.neo4j.bolt.connection.NotificationConfig;
import org.neo4j.bolt.connection.TelemetryApi;
import org.neo4j.bolt.connection.TransactionType;
import org.neo4j.driver.Value;
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.driver.internal.value.BoltValueFactory;

public interface DriverBoltConnection {
<T> CompletionStage<T> onLoop(Supplier<T> supplier);
default CompletionStage<Void> writeAndFlush(DriverResponseHandler handler, Message messages) {
return writeAndFlush(handler, List.of(messages));
}

CompletionStage<DriverBoltConnection> route(
DatabaseName databaseName, String impersonatedUser, Set<String> bookmarks);
CompletionStage<Void> writeAndFlush(DriverResponseHandler handler, List<Message> messages);

CompletionStage<DriverBoltConnection> beginTransaction(
DatabaseName databaseName,
AccessMode accessMode,
String impersonatedUser,
Set<String> bookmarks,
TransactionType transactionType,
Duration txTimeout,
Map<String, Value> txMetadata,
String txType,
NotificationConfig notificationConfig);

CompletionStage<DriverBoltConnection> runInAutoCommitTransaction(
DatabaseName databaseName,
AccessMode accessMode,
String impersonatedUser,
Set<String> bookmarks,
String query,
Map<String, Value> parameters,
Duration txTimeout,
Map<String, Value> txMetadata,
NotificationConfig notificationConfig);

CompletionStage<DriverBoltConnection> run(String query, Map<String, Value> parameters);

CompletionStage<DriverBoltConnection> pull(long qid, long request);

CompletionStage<DriverBoltConnection> discard(long qid, long number);

CompletionStage<DriverBoltConnection> commit();

CompletionStage<DriverBoltConnection> rollback();

CompletionStage<DriverBoltConnection> reset();

CompletionStage<DriverBoltConnection> logoff();

CompletionStage<DriverBoltConnection> logon(Map<String, Value> authMap);

CompletionStage<DriverBoltConnection> telemetry(TelemetryApi telemetryApi);

CompletionStage<DriverBoltConnection> clear();

CompletionStage<Void> flush(DriverResponseHandler handler);
CompletionStage<Void> write(List<Message> messages);

CompletionStage<Void> forceClose(String reason);

CompletionStage<Void> close();

// ----- MUTABLE DATA -----

BoltConnectionState state();

CompletionStage<AuthInfo> authData();

// ----- IMMUTABLE DATA -----
Expand All @@ -103,4 +52,8 @@ CompletionStage<DriverBoltConnection> runInAutoCommitTransaction(
boolean telemetrySupported();

boolean serverSideRoutingEnabled();

// ----- EXTRAS -----

BoltValueFactory valueFactory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package org.neo4j.driver.internal.async;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.driver.AuthTokenManager;
import org.neo4j.driver.exceptions.SecurityException;
import org.neo4j.driver.exceptions.SecurityRetryableException;
Expand All @@ -35,8 +37,8 @@ public BoltConnectionWithAuthTokenManager(DriverBoltConnection delegate, AuthTok
}

@Override
public CompletionStage<Void> flush(DriverResponseHandler handler) {
return delegate.flush(new ErrorMappingResponseHandler(handler, this::mapSecurityError));
public CompletionStage<Void> writeAndFlush(DriverResponseHandler handler, List<Message> messages) {
return delegate.writeAndFlush(new ErrorMappingResponseHandler(handler, this::mapSecurityError), messages);
}

private Throwable mapSecurityError(Throwable throwable) {
Expand Down
Loading