Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,22 @@ public final class YdbConst {
public static final String QUERY_EXPECT_RESULT_SET = "Query must return ResultSet";
public static final String QUERY_EXPECT_UPDATE = "Query must not return ResultSet";
public static final String UNABLE_TO_SET_NULL_OBJECT = "Unable to set null object, type is required";
public static final String DIRECTION_UNSUPPORTED = "Direction is not supported: ";

public static final String RESULT_SET_MODE_UNSUPPORTED = "ResultSet mode is not supported: ";
public static final String RESULT_SET_UNAVAILABLE = "ResultSet is not available at index: ";
public static final String RESULT_SET_IS_CLOSED = "ResultSet is closed";
public static final String RESULT_IS_TRUNCATED = "Result #%s was truncated to %s rows";
public static final String RESULT_WAS_INTERRUPTED = "ResultSet reading was interrupted";
public static final String RESULT_IS_NOT_SCROLLABLE =
"Requested scrollable ResutlSet, but this ResultSet is FORWARD_ONLY.";
public static final String RESULT_SET_TYPE_UNSUPPORTED =
"resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE";
public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED =
"resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY";
public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED =
"resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT";


public static final String INVALID_FETCH_DIRECTION = "Fetch direction %s cannot be used when result set type is %s";
public static final String COLUMN_NOT_FOUND = "Column not found: ";
public static final String COLUMN_NUMBER_NOT_FOUND = "Column is out of range: ";
Expand All @@ -71,19 +83,12 @@ public final class YdbConst {
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
"in prepared statements";
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";
public static final String RESULT_SET_TYPE_UNSUPPORTED =
"resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE";
public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED =
"resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY";
public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED =
"resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT";
public static final String READONLY_INSIDE_TRANSACTION = "Cannot change read-only attribute inside a transaction";
public static final String CHANGE_ISOLATION_INSIDE_TX = "Cannot change transaction isolation inside a transaction";
public static final String UNSUPPORTED_TRANSACTION_LEVEL = "Unsupported transaction level: ";
public static final String CLOSED_CONNECTION = "Connection is closed";
public static final String DB_QUERY_DEADLINE_EXCEEDED = "DB query deadline exceeded: ";
public static final String DB_QUERY_CANCELLED = "DB query cancelled: ";
public static final String DATABASE_QUERY_INTERRUPTED = "Database query interrupted";
public static final String DATABASE_UNAVAILABLE = "Database is unavailable: ";
public static final String CANNOT_LOAD_DATA_FROM_IS = "Unable to load data from input stream: ";
public static final String CANNOT_LOAD_DATA_FROM_READER = "Unable to load data from reader: ";
Expand Down
18 changes: 18 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbQueryResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tech.ydb.jdbc;

import java.sql.SQLException;

/**
*
* @author Aleksandr Gorshenin
*/
public interface YdbQueryResult {
int getUpdateCount() throws SQLException;
YdbResultSet getCurrentResultSet() throws SQLException;
YdbResultSet getGeneratedKeys() throws SQLException;

boolean hasResultSets() throws SQLException;
boolean getMoreResults(int current) throws SQLException;

void close() throws SQLException;
}
2 changes: 0 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public interface YdbResultSet extends ResultSet {
*/
Value<?> getNativeColumn(String columnLabel) throws SQLException;

//

@Override
YdbResultSetMetaData getMetaData() throws SQLException;

Expand Down
15 changes: 15 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,19 @@ public interface YdbStatement extends Statement {

@Override
YdbConnection getConnection() throws SQLException;

@Override
int getQueryTimeout();

@Override
boolean isPoolable();

@Override
int getFetchSize();

@Override
int getFetchDirection();

@Override
int getMaxRows();
}
77 changes: 36 additions & 41 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.common.YdbTypes;
import tech.ydb.jdbc.exception.YdbRetryableException;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.impl.YdbStaticResultSet;
import tech.ydb.jdbc.impl.YdbQueryResultReader;
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
import tech.ydb.jdbc.impl.YdbResultSetMemory;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
Expand Down Expand Up @@ -66,7 +66,8 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept
return validator.call("Get session", null, () -> tableClient.createSession(sessionTimeout));
}

protected void closeCurrentResult() throws SQLException {
@Override
public void clearState() throws SQLException {
YdbQueryResult rs = currResult.get();
if (rs != null) {
rs.close();
Expand All @@ -83,20 +84,18 @@ protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLEx

@Override
public void ensureOpened() throws SQLException {
closeCurrentResult();
if (isClosed()) {
throw new SQLException(YdbConst.CLOSED_CONNECTION);
}
}


@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
long timeout, boolean keepInCache) throws SQLException {
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
boolean insideTx = isInsideTransaction();
while (true) {
try {
return executeQueryImpl(statement, query, preparedYql, params, timeout, keepInCache);
return executeQueryImpl(statement, query, preparedYql, params);
} catch (YdbRetryableException ex) {
if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) {
throw ex;
Expand All @@ -106,7 +105,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
}

protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql,
Params params, long timeout, boolean keepInCache) throws SQLException;
Params params) throws SQLException;

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
Expand All @@ -130,7 +129,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
tracer.close();
}

return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
return updateCurrentResult(new YdbQueryResultStatic(query));
}

@Override
Expand All @@ -152,7 +151,7 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
tracer.close();
}

return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
return updateCurrentResult(new YdbQueryResultStatic(query));
}

@Override
Expand All @@ -164,9 +163,6 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
YdbContext ctx = statement.getConnection().getCtx();
YdbValidator validator = statement.getValidator();
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;

YdbTracer tracer = ctx.getTracer();
Expand All @@ -177,50 +173,49 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S

if (!useStreamResultSet) {
try {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();

Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();

ctx.traceQueryByFullScanDetector(query, yql);
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
);

YdbResultSet rs = new YdbStaticResultSet(types, statement, ProtoValueReaders.forResultSets(resultSets));
return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs)));
YdbResultSet rs = new YdbResultSetMemory(types, statement, ProtoValueReaders.forResultSets(resultSets));
return updateCurrentResult(new YdbQueryResultStatic(query, rs));
} finally {
session.close();
tracer.close();
}
}

StreamQueryResult lazy = validator.call(msg, null, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, types, statement, query, stream::cancel);

stream.start((rsr) -> {
future.complete(Result.success(result));
result.onStreamResultSet(0, rsr);
}).whenComplete((st, th) -> {
final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
@Override
public void onClose(Status status, Throwable th) {
session.close();

if (th != null) {
result.onStreamFinished(th);
future.completeExceptionally(th);
tracer.trace("<-- " + th.getMessage());
}
if (st != null) {
validator.addStatusIssues(st);
result.onStreamFinished(st);
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
tracer.trace("<-- " + st.toString());
if (status != null) {
validator.addStatusIssues(status);
tracer.trace("<-- " + status.toString());
}
tracer.close();
});

return future;
});
super.onClose(status, th);
}
};

return updateCurrentResult(lazy);
}
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.setGrpcFlowControl(reader)
.build();

GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
validator.execute(msg, tracer, () -> reader.load(stream));
return updateCurrentResult(reader);
}
}
Loading