diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java index 661c478..0c603f9 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java @@ -53,10 +53,22 @@ public final class YdbConst { public static final String QUERY_EXPECT_RESULT_SET = "Query must return ResultSet"; public static final String QUERY_EXPECT_UPDATE = "Query must not return ResultSet"; public static final String UNABLE_TO_SET_NULL_OBJECT = "Unable to set null object, type is required"; - public static final String DIRECTION_UNSUPPORTED = "Direction is not supported: "; + public static final String RESULT_SET_MODE_UNSUPPORTED = "ResultSet mode is not supported: "; public static final String RESULT_SET_UNAVAILABLE = "ResultSet is not available at index: "; + public static final String RESULT_SET_IS_CLOSED = "ResultSet is closed"; public static final String RESULT_IS_TRUNCATED = "Result #%s was truncated to %s rows"; + public static final String RESULT_WAS_INTERRUPTED = "ResultSet reading was interrupted"; + public static final String RESULT_IS_NOT_SCROLLABLE = + "Requested scrollable ResutlSet, but this ResultSet is FORWARD_ONLY."; + public static final String RESULT_SET_TYPE_UNSUPPORTED = + "resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE"; + public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED = + "resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY"; + public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED = + "resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT"; + + public static final String INVALID_FETCH_DIRECTION = "Fetch direction %s cannot be used when result set type is %s"; public static final String COLUMN_NOT_FOUND = "Column not found: "; public static final String COLUMN_NUMBER_NOT_FOUND = "Column is out of range: "; @@ -71,19 +83,12 @@ public final class YdbConst { public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " + "in prepared statements"; public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to "; - public static final String RESULT_SET_TYPE_UNSUPPORTED = - "resultSetType must be ResultSet.TYPE_FORWARD_ONLY or ResultSet.TYPE_SCROLL_INSENSITIVE"; - public static final String RESULT_SET_CONCURRENCY_UNSUPPORTED = - "resultSetConcurrency must be ResultSet.CONCUR_READ_ONLY"; - public static final String RESULT_SET_HOLDABILITY_UNSUPPORTED = - "resultSetHoldability must be ResultSet.HOLD_CURSORS_OVER_COMMIT"; public static final String READONLY_INSIDE_TRANSACTION = "Cannot change read-only attribute inside a transaction"; public static final String CHANGE_ISOLATION_INSIDE_TX = "Cannot change transaction isolation inside a transaction"; public static final String UNSUPPORTED_TRANSACTION_LEVEL = "Unsupported transaction level: "; public static final String CLOSED_CONNECTION = "Connection is closed"; public static final String DB_QUERY_DEADLINE_EXCEEDED = "DB query deadline exceeded: "; public static final String DB_QUERY_CANCELLED = "DB query cancelled: "; - public static final String DATABASE_QUERY_INTERRUPTED = "Database query interrupted"; public static final String DATABASE_UNAVAILABLE = "Database is unavailable: "; public static final String CANNOT_LOAD_DATA_FROM_IS = "Unable to load data from input stream: "; public static final String CANNOT_LOAD_DATA_FROM_READER = "Unable to load data from reader: "; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbQueryResult.java new file mode 100644 index 0000000..6ef41e8 --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbQueryResult.java @@ -0,0 +1,18 @@ +package tech.ydb.jdbc; + +import java.sql.SQLException; + +/** + * + * @author Aleksandr Gorshenin + */ +public interface YdbQueryResult { + int getUpdateCount() throws SQLException; + YdbResultSet getCurrentResultSet() throws SQLException; + YdbResultSet getGeneratedKeys() throws SQLException; + + boolean hasResultSets() throws SQLException; + boolean getMoreResults(int current) throws SQLException; + + void close() throws SQLException; +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java index d26bb69..70616fc 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbResultSet.java @@ -26,8 +26,6 @@ public interface YdbResultSet extends ResultSet { */ Value getNativeColumn(String columnLabel) throws SQLException; - // - @Override YdbResultSetMetaData getMetaData() throws SQLException; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java index a619b79..ed13055 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java @@ -43,4 +43,19 @@ public interface YdbStatement extends Statement { @Override YdbConnection getConnection() throws SQLException; + + @Override + int getQueryTimeout(); + + @Override + boolean isPoolable(); + + @Override + int getFetchSize(); + + @Override + int getFetchDirection(); + + @Override + int getMaxRows(); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index 16c90eb..965d31d 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -3,22 +3,22 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.common.YdbTypes; import tech.ydb.jdbc.exception.YdbRetryableException; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.impl.YdbStaticResultSet; +import tech.ydb.jdbc.impl.YdbQueryResultReader; +import tech.ydb.jdbc.impl.YdbQueryResultStatic; +import tech.ydb.jdbc.impl.YdbResultSetMemory; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.Session; @@ -66,7 +66,8 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept return validator.call("Get session", null, () -> tableClient.createSession(sessionTimeout)); } - protected void closeCurrentResult() throws SQLException { + @Override + public void clearState() throws SQLException { YdbQueryResult rs = currResult.get(); if (rs != null) { rs.close(); @@ -83,20 +84,18 @@ protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLEx @Override public void ensureOpened() throws SQLException { - closeCurrentResult(); if (isClosed()) { throw new SQLException(YdbConst.CLOSED_CONNECTION); } } - @Override - public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, - long timeout, boolean keepInCache) throws SQLException { + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { boolean insideTx = isInsideTransaction(); while (true) { try { - return executeQueryImpl(statement, query, preparedYql, params, timeout, keepInCache); + return executeQueryImpl(statement, query, preparedYql, params); } catch (YdbRetryableException ex) { if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) { throw ex; @@ -106,7 +105,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S } protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, - Params params, long timeout, boolean keepInCache) throws SQLException; + Params params) throws SQLException; @Override public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException { @@ -130,7 +129,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) tracer.close(); } - return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList())); + return updateCurrentResult(new YdbQueryResultStatic(query)); } @Override @@ -152,7 +151,7 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, tracer.close(); } - return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList())); + return updateCurrentResult(new YdbQueryResultStatic(query)); } @Override @@ -164,9 +163,6 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S YdbContext ctx = statement.getConnection().getCtx(); YdbValidator validator = statement.getValidator(); Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout(); - ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() - .withRequestTimeout(scanQueryTimeout) - .build(); String msg = QueryType.SCAN_QUERY + " >>\n" + yql; YdbTracer tracer = ctx.getTracer(); @@ -177,6 +173,10 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S if (!useStreamResultSet) { try { + ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() + .withRequestTimeout(scanQueryTimeout) + .build(); + Collection resultSets = new LinkedBlockingQueue<>(); ctx.traceQueryByFullScanDetector(query, yql); @@ -184,43 +184,38 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S () -> session.executeScanQuery(yql, params, settings).start(resultSets::add) ); - YdbResultSet rs = new YdbStaticResultSet(types, statement, ProtoValueReaders.forResultSets(resultSets)); - return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs))); + YdbResultSet rs = new YdbResultSetMemory(types, statement, ProtoValueReaders.forResultSets(resultSets)); + return updateCurrentResult(new YdbQueryResultStatic(query, rs)); } finally { session.close(); tracer.close(); } } - StreamQueryResult lazy = validator.call(msg, null, () -> { - final CompletableFuture> future = new CompletableFuture<>(); - final GrpcReadStream stream = session.executeScanQuery(yql, params, settings); - final StreamQueryResult result = new StreamQueryResult(msg, types, statement, query, stream::cancel); - - stream.start((rsr) -> { - future.complete(Result.success(result)); - result.onStreamResultSet(0, rsr); - }).whenComplete((st, th) -> { + final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) { + @Override + public void onClose(Status status, Throwable th) { session.close(); - if (th != null) { - result.onStreamFinished(th); - future.completeExceptionally(th); tracer.trace("<-- " + th.getMessage()); } - if (st != null) { - validator.addStatusIssues(st); - result.onStreamFinished(st); - future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st)); - tracer.trace("<-- " + st.toString()); + if (status != null) { + validator.addStatusIssues(status); + tracer.trace("<-- " + status.toString()); } tracer.close(); - }); - return future; - }); + super.onClose(status, th); + } + }; - return updateCurrentResult(lazy); - } + ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() + .withRequestTimeout(scanQueryTimeout) + .setGrpcFlowControl(reader) + .build(); + GrpcReadStream stream = session.executeScanQuery(yql, params, settings); + validator.execute(msg, tracer, () -> reader.load(stream)); + return updateCurrentResult(reader); + } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index b020a72..b113f4c 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -4,23 +4,22 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Issue; -import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.impl.YdbStaticResultSet; +import tech.ydb.jdbc.impl.YdbQueryResultExplain; +import tech.ydb.jdbc.impl.YdbQueryResultReader; +import tech.ydb.jdbc.impl.YdbQueryResultStatic; +import tech.ydb.jdbc.impl.YdbResultSetMemory; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.jdbc.settings.YdbOperationProperties; @@ -36,7 +35,6 @@ import tech.ydb.query.settings.RollbackTransactionSettings; import tech.ydb.query.tools.QueryReader; import tech.ydb.table.query.Params; -import tech.ydb.table.result.ResultSetReader; /** * @@ -75,7 +73,7 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE @Override public void close() throws SQLException { - closeCurrentResult(); + clearState(); isClosed = true; QueryTransaction old = tx.getAndSet(null); if (old != null) { @@ -136,13 +134,11 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { @Override public boolean isClosed() throws SQLException { - closeCurrentResult(); return isClosed; } @Override public String txID() throws SQLException { - closeCurrentResult(); QueryTransaction localTx = tx.get(); return localTx != null ? localTx.getId() : null; } @@ -229,17 +225,17 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } @Override - protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params, - long timeout, boolean keepInCache) throws SQLException { + protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { ensureOpened(); YdbValidator validator = statement.getValidator(); - ExecuteQuerySettings.Builder builder = ExecuteQuerySettings.newBuilder(); + int timeout = statement.getQueryTimeout(); + ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); if (timeout > 0) { - builder = builder.withRequestTimeout(timeout, TimeUnit.SECONDS); + settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS); } - final ExecuteQuerySettings settings = builder.build(); QueryTransaction nextTx = tx.get(); while (nextTx == null) { @@ -260,67 +256,51 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query tracer.query(yql); String msg = "STREAM_QUERY >>\n" + yql; - StreamQueryResult lazy = validator.call(msg, tracer, () -> { - final CompletableFuture> future = new CompletableFuture<>(); - final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings); - final StreamQueryResult result = new StreamQueryResult(msg, types, statement, query, stream::cancel); - - stream.execute(new QueryStream.PartsHandler() { - @Override - public void onIssues(Issue[] issues) { - validator.addStatusIssues(Arrays.asList(issues)); - } - - @Override - public void onNextPart(QueryResultPart part) { - result.onStreamResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); - future.complete(Result.success(result)); - } - }).whenComplete((res, th) -> { - if (!localTx.isActive()) { - if (tx.compareAndSet(localTx, null)) { - localTx.getSession().close(); - } - } - + final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) { + @Override + public void onClose(Status status, Throwable th) { if (th != null) { - future.completeExceptionally(th); - result.onStreamFinished(th); tracer.trace("<-- " + th.getMessage()); } - if (res != null) { - validator.addStatusIssues(res.getStatus()); - future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus())); - result.onStreamFinished(res.getStatus()); - tracer.trace("<-- " + res.getStatus().toString()); + if (status != null) { + validator.addStatusIssues(status); + tracer.trace("<-- " + status.toString()); } if (localTx.isActive()) { tracer.setId(localTx.getId()); } else { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } tracer.close(); } - }); - return future; - }); + super.onClose(status, th); + } + }; - return updateCurrentResult(lazy); + settings = settings.withGrpcFlowControl(reader); + QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build()); + validator.execute(msg, tracer, () -> reader.load(validator, stream)); + return updateCurrentResult(reader); } try { tracer.trace("--> data query"); tracer.query(yql); + ExecuteQuerySettings requestSettings = settings.build(); + QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, tracer, - () -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings)) + () -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, requestSettings)) ); validator.addStatusIssues(result.getIssueList()); - List readers = new ArrayList<>(); - for (ResultSetReader rst: result) { - readers.add(new YdbStaticResultSet(types, statement, rst)); + YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()]; + for (int idx = 0; idx < readers.length; idx++) { + readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); } - return updateCurrentResult(new StaticQueryResult(query, readers)); + return updateCurrentResult(new YdbQueryResultStatic(query, readers)); } finally { if (!localTx.isActive()) { if (tx.compareAndSet(localTx, null)) { @@ -361,8 +341,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) } } - - return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList())); + return updateCurrentResult(new YdbQueryResultStatic(query)); } @Override @@ -391,9 +370,9 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query throw new SQLException("No explain data"); } - return updateCurrentResult( - new StaticQueryResult(types, statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan()) - ); + String ast = res.getStats().getQueryAst(); + String plan = res.getStats().getQueryPlan(); + return updateCurrentResult(new YdbQueryResultExplain(types, statement, ast, plan)); } finally { if (tx.get() == null) { tracer.close(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StaticQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StaticQueryResult.java deleted file mode 100644 index 1413dc0..0000000 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StaticQueryResult.java +++ /dev/null @@ -1,175 +0,0 @@ -package tech.ydb.jdbc.context; - -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import tech.ydb.jdbc.YdbConst; -import tech.ydb.jdbc.YdbResultSet; -import tech.ydb.jdbc.YdbStatement; -import tech.ydb.jdbc.common.FixedResultSetFactory; -import tech.ydb.jdbc.common.YdbTypes; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.impl.YdbStaticResultSet; -import tech.ydb.jdbc.query.QueryStatement; -import tech.ydb.jdbc.query.YdbQuery; -import tech.ydb.table.result.ResultSetReader; - -/** - * - * @author Aleksandr Gorshenin - */ -public class StaticQueryResult implements YdbQueryResult { - private static final FixedResultSetFactory EXPLAIN_RS_FACTORY = FixedResultSetFactory.newBuilder() - .addTextColumn(YdbConst.EXPLAIN_COLUMN_AST) - .addTextColumn(YdbConst.EXPLAIN_COLUMN_PLAN) - .build(); - - private static final ExpressionResult NO_UPDATED = new ExpressionResult(0); - // TODO: YDB doesn't return the count of affected rows, so we use little hach to return always 1 - private static final ExpressionResult HAS_UPDATED = new ExpressionResult(1); - - private static class ExpressionResult { - private final int updateCount; - private final YdbResultSet resultSet; - private final YdbResultSet generatedKeys; - - ExpressionResult(int updateCount) { - this.updateCount = updateCount; - this.resultSet = null; - this.generatedKeys = null; - } - - ExpressionResult(int updateCount, YdbResultSet keys) { - this.updateCount = updateCount; - this.resultSet = null; - this.generatedKeys = keys; - } - - ExpressionResult(YdbResultSet result) { - this.updateCount = -1; - this.resultSet = result; - this.generatedKeys = null; - } - } - - private final List results; - private int resultIndex; - - public StaticQueryResult(YdbQuery query, List list) { - this.results = new ArrayList<>(); - this.resultIndex = 0; - - int idx = 0; - for (QueryStatement exp: query.getStatements()) { - if (exp.isDDL()) { - results.add(NO_UPDATED); - continue; - } - - if (exp.hasUpdateWithGenerated() && idx < list.size()) { - results.add(new ExpressionResult(1, list.get(idx))); - idx++; - continue; - } - - if (exp.hasUpdateCount()) { - results.add(HAS_UPDATED); - continue; - } - - if (exp.hasResults() && idx < list.size()) { - results.add(new ExpressionResult(list.get(idx))); - idx++; - } - } - - while (idx < list.size()) { - results.add(new ExpressionResult(list.get(idx))); - idx++; - } - } - - public StaticQueryResult(YdbTypes types, YdbStatement statement, String ast, String plan) { - ResultSetReader result = EXPLAIN_RS_FACTORY.createResultSet() - .newRow() - .withTextValue(YdbConst.EXPLAIN_COLUMN_AST, ast) - .withTextValue(YdbConst.EXPLAIN_COLUMN_PLAN, plan) - .build() - .build(); - - YdbResultSet rs = new YdbStaticResultSet(types, statement, result); - this.results = Collections.singletonList(new ExpressionResult(rs)); - this.resultIndex = 0; - } - - @Override - public void close() throws SQLException { - for (ExpressionResult res: results) { - if (res.resultSet != null) { - res.resultSet.close(); - } - } - } - - @Override - public boolean hasResultSets() { - if (results == null || resultIndex >= results.size()) { - return false; - } - - return results.get(resultIndex).resultSet != null; - } - - @Override - public int getUpdateCount() { - if (results == null || resultIndex >= results.size()) { - return -1; - } - - return results.get(resultIndex).updateCount; - } - - @Override - public YdbResultSet getCurrentResultSet() { - if (results == null || resultIndex >= results.size()) { - return null; - } - return results.get(resultIndex).resultSet; - } - - @Override - public YdbResultSet getGeneratedKeys() { - if (results == null || resultIndex >= results.size()) { - return null; - } - return results.get(resultIndex).generatedKeys; - } - - @Override - public boolean getMoreResults(int current) throws SQLException { - if (results == null || resultIndex >= results.size()) { - return false; - } - - switch (current) { - case Statement.KEEP_CURRENT_RESULT: - break; - case Statement.CLOSE_CURRENT_RESULT: - results.get(resultIndex).resultSet.close(); - break; - case Statement.CLOSE_ALL_RESULTS: - for (int idx = 0; idx <= resultIndex; idx += 1) { - results.get(idx).resultSet.close(); - } - break; - default: - throw new SQLException(YdbConst.RESULT_SET_MODE_UNSUPPORTED + current); - } - - resultIndex += 1; - return hasResultSets(); - } -} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java deleted file mode 100644 index 0d72bc9..0000000 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java +++ /dev/null @@ -1,431 +0,0 @@ -package tech.ydb.jdbc.context; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.UnexpectedResultException; -import tech.ydb.jdbc.YdbConst; -import tech.ydb.jdbc.YdbResultSet; -import tech.ydb.jdbc.YdbStatement; -import tech.ydb.jdbc.common.ColumnInfo; -import tech.ydb.jdbc.common.YdbTypes; -import tech.ydb.jdbc.exception.ExceptionFactory; -import tech.ydb.jdbc.impl.BaseYdbResultSet; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.query.QueryStatement; -import tech.ydb.jdbc.query.YdbQuery; -import tech.ydb.table.result.ResultSetReader; -import tech.ydb.table.result.ValueReader; - -/** - * - * @author Aleksandr Gorshenin - */ -public class StreamQueryResult implements YdbQueryResult { - private static final Logger LOGGER = Logger.getLogger(StreamQueryResult.class.getName()); - - private static final int DDL_EXPRESSION = -1; - private static final int UPDATE_EXPRESSION = -2; - - private final String msg; - private final YdbTypes types; - private final YdbStatement statement; - private final Runnable streamStopper; - - private final CompletableFuture streamFuture = new CompletableFuture<>(); - private final AtomicBoolean streamCancelled = new AtomicBoolean(false); - - private final int[] resultIndexes; - private final List>> resultFutures = new ArrayList<>(); - - private int resultIndex = 0; - private volatile boolean resultClosed = false; - - public StreamQueryResult(String msg, YdbTypes types, YdbStatement statement, YdbQuery query, Runnable stopper) { - this.msg = msg; - this.types = types; - this.statement = statement; - this.streamStopper = stopper; - - this.resultIndexes = new int[query.getStatements().size()]; - - int idx = 0; - for (QueryStatement exp : query.getStatements()) { - if (exp.isDDL()) { - resultIndexes[idx++] = DDL_EXPRESSION; - continue; - } - if (exp.hasUpdateCount()) { - resultIndexes[idx++] = UPDATE_EXPRESSION; - continue; - } - - if (exp.hasResults()) { - resultIndexes[idx++] = resultFutures.size(); - resultFutures.add(new CompletableFuture<>()); - } - } - } - - public void onStreamResultSet(int index, ResultSetReader rsr) { - CompletableFuture> future = resultFutures.get(index); - - if (!future.isDone()) { - ColumnInfo[] columns = ColumnInfo.fromResultSetReader(types, rsr); - LazyResultSet rs = new LazyResultSet(statement, columns); - rs.addResultSet(rsr); - if (future.complete(Result.success(rs))) { - return; - } - } - - Result res = future.join(); - if (res.isSuccess()) { - res.getValue().addResultSet(rsr); - } - } - - public void onStreamFinished(Throwable th) { - streamFuture.completeExceptionally(th); - for (CompletableFuture> future: resultFutures) { - future.completeExceptionally(th); - } - - completeAllSets(); - } - - public void onStreamFinished(Status status) { - for (CompletableFuture> future : resultFutures) { - if (status.isSuccess()) { - future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); - } else { - future.complete(Result.fail(status)); - } - } - streamFuture.complete(status); - - completeAllSets(); - } - - private void completeAllSets() { - for (CompletableFuture> future: resultFutures) { - if (!future.isCompletedExceptionally()) { - Result rs = future.join(); - if (rs.isSuccess()) { - rs.getValue().complete(); - } - } - } - } - - private void closeResultSet(int index) throws SQLException { - try { - CompletableFuture> future = resultFutures.get(index); - if (future != null) { - future.join().getValue().close(); - } - } catch (UnexpectedResultException ex) { - throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex); - } - } - - private void checkStream() { - if (!resultClosed) { - return; - } - - if (!streamFuture.isDone() && streamCancelled.compareAndSet(false, true)) { - LOGGER.log(Level.FINE, "Stream cancel"); - streamStopper.run(); - } - } - - @Override - public void close() throws SQLException { - if (streamFuture.isDone() && resultClosed) { - return; - } - - LOGGER.log(Level.FINE, "Stream closing"); - - resultClosed = true; - Status status = streamFuture.join(); - - if (streamCancelled.get()) { - LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status); - return; - } - - for (CompletableFuture> future: resultFutures) { - if (future.isDone()) { - Result res = future.join(); - if (res.isSuccess()) { - res.getValue().close(); - } - } - } - - LOGGER.log(Level.FINE, "Stream closed with status {0}", status); - if (!status.isSuccess()) { - throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status, - new UnexpectedResultException("Unexpected status", status) - ); - } - } - - @Override - public int getUpdateCount() throws SQLException { - if (resultIndex >= resultIndexes.length) { - return -1; - } - int index = resultIndexes[resultIndex]; - if (index == DDL_EXPRESSION) { - return 0; - } - if (index == UPDATE_EXPRESSION) { - return 1; - } - return -1; - } - - @Override - public YdbResultSet getCurrentResultSet() throws SQLException { - if (resultIndex >= resultIndexes.length) { - return null; - } - int index = resultIndexes[resultIndex]; - if (index < 0 || index >= resultFutures.size()) { - return null; - } - - try { - return resultFutures.get(index).join().getValue(); - } catch (UnexpectedResultException ex) { - throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex); - } - } - - @Override - public YdbResultSet getGeneratedKeys() throws SQLException { - // TODO: Implement - return null; - } - - @Override - public boolean hasResultSets() throws SQLException { - if (resultIndex >= resultIndexes.length) { - return false; - } - return resultIndexes[resultIndex] >= 0; - } - - @Override - public boolean getMoreResults(int current) throws SQLException { - if (resultFutures == null || resultIndex >= resultFutures.size()) { - return false; - } - - switch (current) { - case Statement.KEEP_CURRENT_RESULT: - break; - case Statement.CLOSE_CURRENT_RESULT: - closeResultSet(resultIndex); - break; - - case Statement.CLOSE_ALL_RESULTS: - for (int idx = 0; idx <= resultIndex; idx += 1) { - closeResultSet(resultIndex); - } - break; - default: - throw new SQLException(YdbConst.RESULT_SET_MODE_UNSUPPORTED + current); - } - - resultIndex += 1; - return hasResultSets(); - - } - - private class LazyResultSet extends BaseYdbResultSet { - private final BlockingQueue readers = new ArrayBlockingQueue<>(5); - private final AtomicLong rowsCount = new AtomicLong(); - private final CompletableFuture isCompleted = new CompletableFuture<>(); - private volatile boolean isClosed = false; - - private ResultSetReader current = null; - private int rowIndex = 0; - - LazyResultSet(YdbStatement statement, ColumnInfo[] columns) { - super(statement, columns); - } - - public void cleanQueue() { - boolean isEmpty = false; - while (!isEmpty) { - isEmpty = readers.poll() == null; - } - } - - public void addResultSet(ResultSetReader rsr) { - try { - do { - checkStream(); - } while (!readers.offer(rsr, 100, TimeUnit.MILLISECONDS)); - } catch (InterruptedException ex) { - if (streamFuture.completeExceptionally(ex)) { - LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted"); - streamStopper.run(); - } - return; - } - - long total = rowsCount.addAndGet(rsr.getRowCount()); - LOGGER.log(Level.FINEST, "LazyResultSet got {0} rows", total); - - if (isClosed) { - cleanQueue(); - } - } - - @Override - protected ValueReader getValue(int columnIndex) throws SQLException { - if (current == null) { - throw new SQLException(YdbConst.INVALID_ROW + rowIndex); - } - - return current.getColumn(columnIndex); - } - - @Override - public boolean next() throws SQLException { - while (true) { - if (isClosed) { - return false; - } - - if (current != null && current.next()) { - rowIndex++; - return true; - } - - if (isCompleted.isDone() && readers.isEmpty()) { - current = null; - if (rowsCount.get() > 0) { - rowIndex = rowsCount.intValue() + 1; - } - return false; - } - - try { - current = readers.poll(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - throw new SQLException(ex); - } - } - } - - public void complete() { - isCompleted.complete(null); - } - - @Override - public void close() { - isClosed = true; - current = null; - cleanQueue(); - } - - @Override - public int getRow() throws SQLException { - return rowIndex; - } - - @Override - public boolean isClosed() throws SQLException { - return isClosed; - } - - @Override - public boolean isBeforeFirst() throws SQLException { - return rowsCount.get() > 0 && rowIndex < 1; - } - - @Override - public boolean isAfterLast() throws SQLException { - isCompleted.join(); - return rowsCount.get() > 0 && rowIndex > rowsCount.intValue(); - } - - @Override - public boolean isFirst() throws SQLException { - return rowIndex == 1; - } - - @Override - public boolean isLast() throws SQLException { - isCompleted.join(); - return rowsCount.get() > 0 && rowIndex == rowsCount.intValue(); - } - - @Override - public void beforeFirst() throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public void afterLast() throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public boolean first() throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public boolean last() throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public boolean absolute(int row) throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public boolean relative(int rows) throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public boolean previous() throws SQLException { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - - @Override - public void setFetchDirection(int direction) throws SQLException { - if (direction != ResultSet.FETCH_FORWARD) { - throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); - } - } - - @Override - public int getFetchDirection() throws SQLException { - return ResultSet.FETCH_FORWARD; - } - } -} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index df718e9..28fe73a 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -4,15 +4,15 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.impl.YdbStaticResultSet; +import tech.ydb.jdbc.impl.YdbQueryResultExplain; +import tech.ydb.jdbc.impl.YdbQueryResultStatic; +import tech.ydb.jdbc.impl.YdbResultSetMemory; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.jdbc.settings.YdbOperationProperties; @@ -45,7 +45,7 @@ public TableServiceExecutor(YdbContext ctx) throws SQLException { @Override public void close() throws SQLException { - closeCurrentResult(); + clearState(); tx = null; } @@ -76,13 +76,11 @@ public void setAutoCommit(boolean autoCommit) throws SQLException { @Override public boolean isClosed() throws SQLException { - closeCurrentResult(); return tx == null; } @Override public String txID() throws SQLException { - closeCurrentResult(); return tx != null ? tx.txID() : null; } @@ -162,14 +160,16 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } } - private ExecuteDataQuerySettings dataQuerySettings(long timeout, boolean keepInCache) { + private ExecuteDataQuerySettings dataQuerySettings(YdbStatement statement) { + int timeout = statement.getQueryTimeout(); + ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings(); if (timeout > 0) { settings = settings .setOperationTimeout(Duration.ofSeconds(timeout)) .setTimeout(Duration.ofSeconds(timeout + 1)); } - if (!keepInCache) { + if (!statement.isPoolable()) { settings = settings.disableQueryCache(); } @@ -191,7 +191,9 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query try (Session session = createNewTableSession(validator)) { String msg = QueryType.EXPLAIN_QUERY + " >>\n" + yql; ExplainDataQueryResult res = validator.call(msg, tracer, () -> session.explainDataQuery(yql, settings)); - return updateCurrentResult(new StaticQueryResult(types, statement, res.getQueryAst(), res.getQueryPlan())); + String ast = res.getQueryAst(); + String plan = res.getQueryPlan(); + return updateCurrentResult(new YdbQueryResultExplain(types, statement, ast, plan)); } finally { if (!tx.isInsideTransaction()) { tracer.close(); @@ -200,8 +202,8 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query } @Override - protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params, - long timeout, boolean keepInCache) throws SQLException { + protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { ensureOpened(); YdbValidator validator = statement.getValidator(); @@ -214,11 +216,11 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query try { DataQueryResult result = validator.call( QueryType.DATA_QUERY + " >>\n" + yql, tracer, - () -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(timeout, keepInCache)) + () -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(statement)) ); updateState(tx.withDataQuery(session, result.getTxId())); - List readers = new ArrayList<>(); + YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()]; for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { ResultSetReader rs = result.getResultSet(idx); if (failOnTruncatedResult && rs.isTruncated()) { @@ -226,10 +228,10 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query throw new SQLException(msg); } - readers.add(new YdbStaticResultSet(types, statement, rs)); + readers[idx] = new YdbResultSetMemory(types, statement, rs); } - return updateCurrentResult(new StaticQueryResult(query, readers)); + return updateCurrentResult(new YdbQueryResultStatic(query, readers)); } catch (SQLException | RuntimeException ex) { updateState(tx.withRollback(session)); throw ex; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java index c10bb87..e5c6683 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java @@ -12,11 +12,11 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.exception.ExceptionFactory; import tech.ydb.jdbc.exception.YdbConditionallyRetryableException; -import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; @@ -85,10 +85,10 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } @Override - public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, - long timeout, boolean keepInCache) throws SQLException { + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { try { - YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache); + YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params); isWriteTx = isInsideTransaction() && (isWriteTx || query.isWriting()); return result; } catch (YdbConditionallyRetryableException ex) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java index fdf1feb..50ec095 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java @@ -2,8 +2,8 @@ import java.sql.SQLException; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbStatement; -import tech.ydb.jdbc.impl.YdbQueryResult; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.query.Params; import tech.ydb.table.values.ListValue; @@ -16,6 +16,7 @@ public interface YdbExecutor { void close() throws SQLException; boolean isClosed() throws SQLException; void ensureOpened() throws SQLException; + void clearState() throws SQLException; String txID() throws SQLException; int transactionLevel() throws SQLException; @@ -28,14 +29,11 @@ public interface YdbExecutor { void setReadOnly(boolean readOnly) throws SQLException; void setAutoCommit(boolean autoCommit) throws SQLException; - YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException; - YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows) - throws SQLException; - YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query) throws SQLException; - YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params) - throws SQLException; - YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String yql, Params params, - long timeout, boolean poolable) throws SQLException; + YdbQueryResult executeSchemeQuery(YdbStatement st, YdbQuery query) throws SQLException; + YdbQueryResult executeBulkUpsert(YdbStatement st, YdbQuery query, String path, ListValue rows) throws SQLException; + YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException; + YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; + YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; void commit(YdbContext ctx, YdbValidator validator) throws SQLException; void rollback(YdbContext ctx, YdbValidator validator) throws SQLException; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java index 5d99c0f..4c72ddc 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java @@ -71,6 +71,7 @@ public String nativeSQL(String sql) { @Override public void setAutoCommit(boolean autoCommit) throws SQLException { executor.ensureOpened(); + executor.clearState(); if (autoCommit == executor.isAutoCommit()) { return; } @@ -167,6 +168,7 @@ public SQLWarning getWarnings() throws SQLException { @Override public void clearWarnings() throws SQLException { executor.ensureOpened(); + executor.clearState(); validator.clearWarnings(); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbDatabaseMetaDataImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbDatabaseMetaDataImpl.java index b38c344..30c6903 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbDatabaseMetaDataImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbDatabaseMetaDataImpl.java @@ -1407,12 +1407,12 @@ private TableDescription describeTable(String table) throws SQLException { private ResultSet emptyResultSet(FixedResultSetFactory factory) { YdbStatementImpl statement = new YdbStatementImpl(connection, ResultSet.TYPE_SCROLL_INSENSITIVE); - return new YdbStaticResultSet(connection.getCtx().getTypes(), statement, factory.createResultSet().build()); + return new YdbResultSetMemory(connection.getCtx().getTypes(), statement, factory.createResultSet().build()); } private ResultSet resultSet(ResultSetReader rsReader) { YdbStatementImpl statement = new YdbStatementImpl(connection, ResultSet.TYPE_SCROLL_INSENSITIVE); - return new YdbStaticResultSet(connection.getCtx().getTypes(), statement, rsReader); + return new YdbResultSetMemory(connection.getCtx().getTypes(), statement, rsReader); } private boolean isMatchedCatalog(String catalog) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java index 74d9d35..ef456f6 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java @@ -29,6 +29,7 @@ import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbParameterMetaData; import tech.ydb.jdbc.YdbPreparedStatement; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.common.MappingSetters; import tech.ydb.jdbc.query.QueryType; @@ -38,7 +39,7 @@ import tech.ydb.table.query.Params; import tech.ydb.table.values.Type; -public class YdbPreparedStatementImpl extends BaseYdbStatement implements YdbPreparedStatement { +public class YdbPreparedStatementImpl extends YdbStatementBase implements YdbPreparedStatement { private static final Logger LOGGER = Logger.getLogger(YdbPreparedStatementImpl.class.getName()); private final YdbQuery query; private final YdbPreparedQuery prepared; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResult.java deleted file mode 100644 index 8754bce..0000000 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResult.java +++ /dev/null @@ -1,50 +0,0 @@ -package tech.ydb.jdbc.impl; - -import java.sql.SQLException; - -import tech.ydb.jdbc.YdbResultSet; - -/** - * - * @author Aleksandr Gorshenin - */ -public interface YdbQueryResult { - YdbQueryResult EMPTY = new YdbQueryResult() { - @Override - public int getUpdateCount() throws SQLException { - return -1; - } - - @Override - public YdbResultSet getCurrentResultSet() throws SQLException { - return null; - } - - @Override - public YdbResultSet getGeneratedKeys() throws SQLException { - return null; - } - - @Override - public boolean hasResultSets() throws SQLException { - return false; - } - - @Override - public boolean getMoreResults(int current) throws SQLException { - return false; - } - - @Override - public void close() throws SQLException { } - }; - - int getUpdateCount() throws SQLException; - YdbResultSet getCurrentResultSet() throws SQLException; - YdbResultSet getGeneratedKeys() throws SQLException; - - boolean hasResultSets() throws SQLException; - boolean getMoreResults(int current) throws SQLException; - - void close() throws SQLException; -} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultBase.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultBase.java new file mode 100644 index 0000000..264b71a --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultBase.java @@ -0,0 +1,159 @@ +package tech.ydb.jdbc.impl; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import tech.ydb.jdbc.YdbQueryResult; +import tech.ydb.jdbc.YdbResultSet; +import tech.ydb.jdbc.query.QueryStatement; +import tech.ydb.jdbc.query.YdbQuery; + +/** + * + * @author Aleksandr Gorshenin + */ +public abstract class YdbQueryResultBase implements YdbQueryResult { + private static final ResultMeta NO_UPDATED = new ResultMeta(0); + // TODO: YDB doesn't return the count of affected rows, so we use little hach to return always 1 + private static final ResultMeta HAS_UPDATED = new ResultMeta(1); + + private static class ResultMeta { + private final int updateCount; + private final int resultSetIndex; + private final boolean isGeneratedKeys; + + ResultMeta(int updateCount) { + this(updateCount, -1, false); + } + + ResultMeta(int updateCount, int resultSetIndex, boolean isGeneratedKeys) { + this.updateCount = updateCount; + this.resultSetIndex = resultSetIndex; + this.isGeneratedKeys = isGeneratedKeys; + } + } + + private final List results; + private int resultIndex; + + public YdbQueryResultBase(int index) { // single query result + this.results = new ArrayList<>(); + this.resultIndex = 0; + this.results.add(new ResultMeta(-1, index, false)); + } + + public YdbQueryResultBase(YdbQuery query, int rsCount) { + this.results = new ArrayList<>(); + this.resultIndex = 0; + + int idx = 0; + for (QueryStatement exp: query.getStatements()) { + if (exp.isDDL()) { + results.add(NO_UPDATED); + continue; + } + + if (exp.hasUpdateWithGenerated() && idx < rsCount) { + results.add(new ResultMeta(1, idx, true)); + idx++; + continue; + } + + if (exp.hasUpdateCount()) { + results.add(HAS_UPDATED); + continue; + } + + if (exp.hasResults() && idx < rsCount) { + results.add(new ResultMeta(-1, idx, false)); + idx++; + } + } + + while (idx < rsCount) { + results.add(new ResultMeta(-1, idx, false)); + idx++; + } + } + + protected abstract YdbResultSet getResultSet(int index) throws SQLException; + protected abstract void closeResultSet(int index) throws SQLException; + + @Override + public void close() throws SQLException { + for (ResultMeta meta: results) { + if (meta.resultSetIndex >= 0) { + closeResultSet(meta.resultSetIndex); + } + } + } + + @Override + public boolean hasResultSets() { + if (resultIndex >= results.size()) { + return false; + } + + ResultMeta exp = results.get(resultIndex); + return !exp.isGeneratedKeys && exp.resultSetIndex >= 0; + } + + @Override + public int getUpdateCount() { + if (resultIndex >= results.size()) { + return -1; + } + + return results.get(resultIndex).updateCount; + } + + @Override + public YdbResultSet getCurrentResultSet() throws SQLException { + if (resultIndex >= results.size()) { + return null; + } + + ResultMeta meta = results.get(resultIndex); + if (meta.isGeneratedKeys || meta.resultSetIndex < 0) { + return null; + } + + return getResultSet(meta.resultSetIndex); + } + + @Override + public YdbResultSet getGeneratedKeys() throws SQLException { + if (resultIndex >= results.size()) { + return null; + } + + ResultMeta meta = results.get(resultIndex); + if (!meta.isGeneratedKeys || meta.resultSetIndex < 0) { + return null; + } + + return getResultSet(meta.resultSetIndex); + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + if (resultIndex >= results.size()) { + return false; + } + + if (current == Statement.CLOSE_CURRENT_RESULT && resultIndex < results.size()) { + closeResultSet(resultIndex); + } + + if (current == Statement.CLOSE_ALL_RESULTS) { + for (int idx = 0; idx <= resultIndex && idx < results.size(); idx += 1) { + closeResultSet(idx); + } + } + + resultIndex += 1; + return hasResultSets(); + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultEmpty.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultEmpty.java new file mode 100644 index 0000000..132cbac --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultEmpty.java @@ -0,0 +1,42 @@ +package tech.ydb.jdbc.impl; + +import java.sql.SQLException; + +import tech.ydb.jdbc.YdbQueryResult; +import tech.ydb.jdbc.YdbResultSet; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbQueryResultEmpty implements YdbQueryResult { + @Override + public int getUpdateCount() throws SQLException { + return -1; + } + + @Override + public YdbResultSet getCurrentResultSet() throws SQLException { + return null; + } + + @Override + public YdbResultSet getGeneratedKeys() throws SQLException { + return null; + } + + @Override + public boolean hasResultSets() throws SQLException { + return false; + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return false; + } + + @Override + public void close() throws SQLException { + // nothing + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultExplain.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultExplain.java new file mode 100644 index 0000000..57998a0 --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultExplain.java @@ -0,0 +1,47 @@ +package tech.ydb.jdbc.impl; + +import java.sql.SQLException; + +import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbResultSet; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.common.FixedResultSetFactory; +import tech.ydb.jdbc.common.YdbTypes; +import tech.ydb.table.result.ResultSetReader; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbQueryResultExplain extends YdbQueryResultBase { + private static final FixedResultSetFactory EXPLAIN_RS_FACTORY = FixedResultSetFactory.newBuilder() + .addTextColumn(YdbConst.EXPLAIN_COLUMN_AST) + .addTextColumn(YdbConst.EXPLAIN_COLUMN_PLAN) + .build(); + + private final YdbResultSet rs; + + public YdbQueryResultExplain(YdbTypes types, YdbStatement statement, String ast, String plan) { + super(0); + + ResultSetReader result = EXPLAIN_RS_FACTORY.createResultSet().newRow() + .withTextValue(YdbConst.EXPLAIN_COLUMN_AST, ast) + .withTextValue(YdbConst.EXPLAIN_COLUMN_PLAN, plan) + .build().build(); + + this.rs = new YdbResultSetMemory(types, statement, result); + } + + + @Override + protected YdbResultSet getResultSet(int index) throws SQLException { + return index == 0 ? rs : null; + } + + @Override + protected void closeResultSet(int index) throws SQLException { + if (index == 0) { + rs.close(); + } + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java new file mode 100644 index 0000000..cfd68ad --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java @@ -0,0 +1,313 @@ +package tech.ydb.jdbc.impl; + + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntConsumer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import tech.ydb.core.Issue; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcFlowControl; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.jdbc.YdbResultSet; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.common.ColumnInfo; +import tech.ydb.jdbc.common.YdbTypes; +import tech.ydb.jdbc.context.YdbValidator; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.query.QueryStream; +import tech.ydb.query.result.QueryResultPart; +import tech.ydb.table.result.ResultSetReader; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbQueryResultReader extends YdbQueryResultBase implements GrpcFlowControl { + private static final Logger LOGGER = Logger.getLogger(YdbQueryResultReader.class.getName()); + + private final YdbTypes types; + private final YdbStatement statement; + private final int fetchSize; + + private final LazyRs[] rs; + private final ReentrantLock lock = new ReentrantLock(); + private final Condition isReady = lock.newCondition(); + + private int lastRsIndex = 0; + private CallCtrl callFlow = null; + private Runnable canceller = null; + + private volatile boolean isStreamCompleted = false; + + public YdbQueryResultReader(YdbTypes types, YdbStatement statement, YdbQuery query) { + super(query, query.getStatements().size()); + this.types = types; + this.statement = statement; + this.fetchSize = statement.getFetchSize(); + this.rs = new LazyRs[query.getStatements().size()]; + for (int idx = 0; idx < rs.length; idx += 1) { + rs[idx] = new LazyRs(); + } + } + + private void waitForUpdates() throws SQLException { + lock.lock(); + try { + isReady.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new SQLException(ex); + } finally { + lock.unlock(); + } + } + + private void releaseWaiters() { + lock.lock(); + try { + isReady.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws SQLException { + super.close(); + + if (!isStreamCompleted) { + waitForUpdates(); + if (!isStreamCompleted && canceller != null) { + canceller.run(); + + // wait of completing + while (!isStreamCompleted) { + waitForUpdates(); + } + } + } + } + + public boolean onRead(int index, ResultSetReader rsr) { + int count = rsr.getRowCount(); + if (index < 0 || index >= rs.length || rs[index].isClosed) { + LOGGER.log(Level.FINEST, "Skipped {0} rows", count); + releaseWaiters(); + return fetchSize > 0; + } + + for (int prev = lastRsIndex; prev < index; prev += 1) { + rs[prev].isCompleted = true; + } + lastRsIndex = index; + + LOGGER.log(Level.FINEST, "Loaded {0} rows", count); + callFlow.loadRows(count); + rs[index].queue.offer(rsr); + releaseWaiters(); + + return fetchSize > 0 && callFlow.loaded.get() > fetchSize; + } + + public void onClose(Status status, Throwable th) { + isStreamCompleted = true; + for (int idx = 0; idx < rs.length; idx += 1) { + rs[idx].isCompleted = true; + } + releaseWaiters(); + } + + @Override + public Call newCall(IntConsumer req) { + callFlow = new CallCtrl(req); + return callFlow; + } + + @Override + protected YdbResultSet getResultSet(int index) throws SQLException { + if (index < 0 || index >= rs.length) { + return null; + } + + YdbResultSet ready = rs[index].getReady(); + + while (ready == null && !isStreamCompleted) { + ready = rs[index].getReady(); + } + + return ready; + } + + @Override + protected void closeResultSet(int index) throws SQLException { + if (index < 0 || index >= rs.length) { + return; + } + + rs[index].close(); + } + + public CompletableFuture load(GrpcReadStream stream) { + CompletableFuture resultIsReady = new CompletableFuture<>(); + canceller = stream::cancel; + + stream.start(rsr -> { + if (onRead(0, rsr)) { + resultIsReady.complete(Status.SUCCESS); + } + }).whenComplete((status, th) -> { + onClose(status, th); + + if (status != null) { + resultIsReady.complete(status); + } else { + resultIsReady.complete(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th)); + } + }); + + return resultIsReady; + } + + + public CompletableFuture load(YdbValidator validator, QueryStream stream) { + CompletableFuture resultIsReady = new CompletableFuture<>(); + canceller = stream::cancel; + + stream.execute(new QueryStream.PartsHandler() { + @Override + public void onIssues(Issue[] issues) { + validator.addStatusIssues(Arrays.asList(issues)); + } + + @Override + public void onNextPart(QueryResultPart part) { + if (onRead((int) part.getResultSetIndex(), part.getResultSetReader())) { + resultIsReady.complete(Status.SUCCESS); + } + } + }).whenComplete((result, th) -> { + Status status = result != null ? result.getStatus() : null; + onClose(status, th); + + if (status != null) { + resultIsReady.complete(status); + } else { + resultIsReady.complete(Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th)); + } + }); + + return resultIsReady; + } + + private class CallCtrl implements GrpcFlowControl.Call { + private final IntConsumer request; + private final AtomicInteger loaded = new AtomicInteger(0); + private final AtomicBoolean isPaused = new AtomicBoolean(false); + + CallCtrl(IntConsumer request) { + this.request = request; + } + + @Override + public void onStart() { + request.accept(1); + } + + @Override + public void onMessageRead() { + if (fetchSize <= 0 || loaded.get() < fetchSize) { + request.accept(1); + } else { + isPaused.set(true); + } + } + + public void loadRows(int rows) { + loaded.addAndGet(rows); + } + + public void processRows(int rows) { + if (loaded.addAndGet(-rows) < fetchSize) { + if (isPaused.compareAndSet(true, false)) { + request.accept(1); + } + } + } + } + + private class LazyRs { + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private YdbResultSet rs = null; + private boolean isClosed = false; + private boolean isCompleted = false; + + void close() throws SQLException { + if (rs != null) { + rs.close(); + } + isClosed = true; + isCompleted = true; + } + + YdbResultSet getReady() throws SQLException { + if (rs != null) { + return rs; + } + + while (!isCompleted && queue.isEmpty()) { + waitForUpdates(); + } + + if (isCompleted && fetchSize <= 0) { // can use in memory result set + rs = new YdbResultSetMemory(types, statement, queue.toArray(new ResultSetReader[0])); + return rs; + } + + if (queue.isEmpty()) { + return null; + } + + ResultSetReader first = queue.peek(); + if (first.getRowCount() == 0) { + queue.remove(); + } + + ColumnInfo[] columns = ColumnInfo.fromResultSetReader(types, Objects.requireNonNull(first)); + rs = new YdbResultSetForwardOnly(statement, columns) { + @Override + protected boolean hasNext() throws SQLException { + while (!isCompleted && queue.isEmpty()) { + waitForUpdates(); + + ResultSetReader next = queue.peek(); + if (next != null && next.getRowCount() == 0) { + queue.remove(); + } + } + + return !queue.isEmpty(); + } + + @Override + protected ResultSetReader readNext() throws SQLException { + ResultSetReader next = queue.poll(); + LOGGER.log(Level.FINEST, "Processed {0} rows", next.getRowCount()); + callFlow.processRows(next.getRowCount()); + return next; + } + }; + return rs; + } + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java new file mode 100644 index 0000000..f95a7f8 --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java @@ -0,0 +1,36 @@ +package tech.ydb.jdbc.impl; + + +import java.sql.SQLException; + +import tech.ydb.jdbc.YdbResultSet; +import tech.ydb.jdbc.query.YdbQuery; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbQueryResultStatic extends YdbQueryResultBase { + private final YdbResultSet[] rs; + + public YdbQueryResultStatic(YdbQuery query, YdbResultSet... rs) { + super(query, rs != null ? rs.length : 0); + this.rs = rs; + } + + @Override + protected YdbResultSet getResultSet(int index) throws SQLException { + if (index < 0 || index >= rs.length) { + return null; + } + return rs[index]; + } + + @Override + protected void closeResultSet(int index) throws SQLException { + if (index < 0 || index >= rs.length) { + return; + } + rs[index].close(); + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbResultSet.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetBase.java similarity index 99% rename from jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbResultSet.java rename to jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetBase.java index 126f7b4..4982d01 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbResultSet.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetBase.java @@ -45,7 +45,7 @@ * * @author Aleksandr Gorshenin */ -public abstract class BaseYdbResultSet implements YdbResultSet { +public abstract class YdbResultSetBase implements YdbResultSet { protected final YdbStatement statement; private final ColumnInfo[] columns; @@ -54,7 +54,7 @@ public abstract class BaseYdbResultSet implements YdbResultSet { private YdbResultSetMetaData metaData = null; private boolean wasNull = false; - protected BaseYdbResultSet(YdbStatement statement, ColumnInfo[] columns) { + protected YdbResultSetBase(YdbStatement statement, ColumnInfo[] columns) { this.statement = Objects.requireNonNull(statement); this.columns = columns; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetForwardOnly.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetForwardOnly.java new file mode 100644 index 0000000..bcffc6d --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetForwardOnly.java @@ -0,0 +1,166 @@ +package tech.ydb.jdbc.impl; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.common.ColumnInfo; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.ValueReader; + +/** + * + * @author Aleksandr Gorshenin + */ +public abstract class YdbResultSetForwardOnly extends YdbResultSetBase { + private ResultSetReader current = null; + private boolean isClosed = false; + + private int currentIndex = 0; + private int rowIndex = 0; + + public YdbResultSetForwardOnly(YdbStatement statement, ColumnInfo[] columns) { + super(statement, columns); + } + + protected abstract boolean hasNext() throws SQLException; + protected abstract ResultSetReader readNext() throws SQLException; + + @Override + public void close() { + isClosed = true; + } + + @Override + protected ValueReader getValue(int columnIndex) throws SQLException { + if (isClosed) { + throw new SQLException(YdbConst.RESULT_SET_IS_CLOSED); + } + + if (rowIndex <= 0 || current == null) { + throw new SQLException(YdbConst.INVALID_ROW + rowIndex); + } + + return current.getColumn(columnIndex); + } + + @Override + public boolean next() throws SQLException { + if (isClosed) { + return false; + } + + if (current != null && current.next()) { + rowIndex++; + currentIndex++; + return true; + } + + while (hasNext()) { + current = readNext(); + currentIndex = 0; + + if (current.next()) { + rowIndex++; + currentIndex++; + return true; + } + } + + // nothing to read, reset index like Postgres + rowIndex = 0; + currentIndex = current != null ? current.getRowCount() + 1 : 1; + return false; + } + + @Override + public int getRow() throws SQLException { + return rowIndex; + } + + @Override + public boolean isClosed() throws SQLException { + return isClosed; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + if (current == null) { + return hasNext(); + } + + return rowIndex == 0 && currentIndex == 0; + } + + @Override + public boolean isAfterLast() throws SQLException { + if (current == null) { + return false; + } + + return !hasNext() && currentIndex > current.getRowCount(); + } + + @Override + public boolean isFirst() throws SQLException { + return current != null && rowIndex == 1; + } + + @Override + public boolean isLast() throws SQLException { + if (hasNext()) { + return false; + } + + return current != null && currentIndex > 0 && currentIndex == current.getRowCount(); + } + + @Override + public void beforeFirst() throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public void afterLast() throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public boolean first() throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public boolean last() throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public boolean absolute(int row) throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public boolean relative(int rows) throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public boolean previous() throws SQLException { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + if (direction != ResultSet.FETCH_FORWARD) { + throw new SQLFeatureNotSupportedException(YdbConst.FORWARD_ONLY_MODE); + } + } + + @Override + public int getFetchDirection() throws SQLException { + return ResultSet.FETCH_FORWARD; + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStaticResultSet.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java similarity index 52% rename from jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStaticResultSet.java rename to jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java index 7677a86..122285d 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStaticResultSet.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java @@ -11,33 +11,57 @@ import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.result.ValueReader; -public class YdbStaticResultSet extends BaseYdbResultSet { - private final ResultSetReader rsReader; - private final int rowCount; +public class YdbResultSetMemory extends YdbResultSetBase { + private final ResultSetReader[] rs; + private final int totalCount; - private int fetchDirection = ResultSet.FETCH_UNKNOWN; + private int fetchDirection; + private int globalRowIndex = 0; // before start + + private int rsIndex = 0; private int rowIndex = 0; + private boolean isClosed = false; - public YdbStaticResultSet(YdbTypes types, YdbStatement statement, ResultSetReader result) { - super(statement, ColumnInfo.fromResultSetReader(types, Objects.requireNonNull(result))); - this.rsReader = result; - this.rowCount = result.getRowCount(); + public YdbResultSetMemory(YdbTypes types, YdbStatement statement, ResultSetReader... rs) { + super(statement, ColumnInfo.fromResultSetReader(types, Objects.requireNonNull(rs[0]))); + this.fetchDirection = statement.getFetchDirection(); + this.rs = rs; + int total = 0; + for (int idx = 0; idx < rs.length; idx += 1) { + total += rs[idx].getRowCount(); + } + this.totalCount = total; } @Override protected ValueReader getValue(int columnIndex) throws SQLException { - try { - return rsReader.getColumn(columnIndex); - } catch (IllegalStateException ex) { - throw new SQLException(YdbConst.INVALID_ROW + rowIndex); + if (!isRowIndexValid()) { + throw new SQLException(YdbConst.INVALID_ROW + globalRowIndex); } + return rs[rsIndex].getColumn(columnIndex); } @Override public boolean next() { - setRowIndex(rowIndex + 1); - return isRowIndexValid(); + while (true) { + if (rsIndex >= rs.length) { + rsIndex = totalCount; + globalRowIndex = totalCount + 1; + rowIndex = 0; + return false; + } + + if (rowIndex < rs[rsIndex].getRowCount()) { + rs[rsIndex].setRowIndex(rowIndex); + globalRowIndex++; + rowIndex++; + return true; + } + + rsIndex++; + rowIndex = 0; + } } @Override @@ -47,22 +71,22 @@ public void close() { @Override public boolean isBeforeFirst() { - return rowCount != 0 && rowIndex <= 0; + return totalCount > 0 && globalRowIndex == 0; } @Override public boolean isAfterLast() { - return rowCount != 0 && rowIndex > rowCount; + return totalCount > 0 && globalRowIndex > totalCount; } @Override public boolean isFirst() { - return rowCount != 0 && rowIndex == 1; + return totalCount > 0 && globalRowIndex == 1; } @Override public boolean isLast() { - return rowCount != 0 && rowIndex == rowCount; + return totalCount > 0 && globalRowIndex == totalCount; } @Override @@ -74,7 +98,7 @@ public void beforeFirst() throws SQLException { @Override public void afterLast() throws SQLException { checkScroll(); - setRowIndex(rowCount + 1); + setRowIndex(totalCount + 1); } @Override @@ -87,13 +111,13 @@ public boolean first() throws SQLException { @Override public boolean last() throws SQLException { checkScroll(); - setRowIndex(rowCount); + setRowIndex(totalCount); return isRowIndexValid(); } @Override public int getRow() { - return rowIndex; + return isRowIndexValid() ? globalRowIndex : 0; } @Override @@ -102,7 +126,7 @@ public boolean absolute(int row) throws SQLException { if (row >= 0) { setRowIndex(row); } else { - setRowIndex(rowCount + 1 + row); + setRowIndex(totalCount + 1 + row); } return isRowIndexValid(); } @@ -111,7 +135,7 @@ public boolean absolute(int row) throws SQLException { public boolean relative(int rows) throws SQLException { checkScroll(); if (rows != 0) { - setRowIndex(rowIndex + rows); + setRowIndex(globalRowIndex + rows); } return isRowIndexValid(); } @@ -119,7 +143,7 @@ public boolean relative(int rows) throws SQLException { @Override public boolean previous() throws SQLException { checkScroll(); - setRowIndex(rowIndex - 1); + setRowIndex(globalRowIndex - 1); return isRowIndexValid(); } @@ -159,15 +183,35 @@ private void checkScroll() throws SQLException { } } - private void setRowIndex(int rowIndex) { - if (rowCount > 0) { - int actualIndex = Math.max(Math.min(rowIndex, rowCount + 1), 0); - this.rowIndex = actualIndex; - rsReader.setRowIndex(actualIndex - 1); + private void setRowIndex(int index) { + if (index <= 0) { // before first + globalRowIndex = 0; + rsIndex = 0; + rowIndex = 0; + return; } + + if (index > totalCount) { // after last + globalRowIndex = totalCount + 1; + rsIndex = rs.length; + rowIndex = 0; + return; + } + + globalRowIndex = index; + rsIndex = 0; + rowIndex = index; + int currentSize = rs[rsIndex].getRowCount(); + while (rowIndex > currentSize) { + rsIndex++; + rowIndex -= currentSize; + currentSize = rs[rsIndex].getRowCount(); + } + + rs[rsIndex].setRowIndex(rowIndex - 1); } private boolean isRowIndexValid() { - return rowIndex > 0 && rowIndex <= rowCount; + return rsIndex >= 0 && rsIndex < rs.length && globalRowIndex > 0 && globalRowIndex <= totalCount; } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMetaDataImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMetaDataImpl.java index ba2b650..48008a6 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMetaDataImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMetaDataImpl.java @@ -8,9 +8,9 @@ import tech.ydb.table.values.Type; public class YdbResultSetMetaDataImpl implements YdbResultSetMetaData { - private final BaseYdbResultSet rs; + private final YdbResultSetBase rs; - public YdbResultSetMetaDataImpl(BaseYdbResultSet rs) { + public YdbResultSetMetaDataImpl(YdbResultSetBase rs) { this.rs = rs; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java similarity index 84% rename from jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java rename to jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java index 91958eb..af961a0 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java @@ -5,16 +5,15 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; -import java.util.Collections; import java.util.Objects; import java.util.logging.Logger; import tech.ydb.jdbc.YdbConnection; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.context.QueryStat; -import tech.ydb.jdbc.context.StaticQueryResult; import tech.ydb.jdbc.context.YdbContext; import tech.ydb.jdbc.context.YdbValidator; import tech.ydb.jdbc.query.YdbQuery; @@ -28,21 +27,27 @@ * * @author Aleksandr Gorshenin */ -public abstract class BaseYdbStatement implements YdbStatement { +public abstract class YdbStatementBase implements YdbStatement { + private static final YdbQueryResult EMPTY_RESULT = new YdbQueryResultEmpty(); + private final YdbConnection connection; private final YdbValidator validator; private final int resultSetType; - private final int maxRows; private final FakeTxMode scanQueryTxMode; private final FakeTxMode schemeQueryTxMode; private final FakeTxMode bulkQueryTxMode; - private YdbQueryResult state = YdbQueryResult.EMPTY; + private YdbQueryResult state = EMPTY_RESULT; private int queryTimeout; private boolean isPoolable; private boolean isClosed = false; - public BaseYdbStatement(Logger logger, YdbConnection connection, int resultSetType, boolean isPoolable) { + /** @see Statement#getMaxRows() */ + private int maxRows = 0; // no limit + private int fetchSize = 0; + private int fetchDirection = ResultSet.FETCH_UNKNOWN; + + public YdbStatementBase(Logger logger, YdbConnection connection, int resultSetType, boolean isPoolable) { this.connection = Objects.requireNonNull(connection); this.validator = new YdbValidator(); this.resultSetType = resultSetType; @@ -50,14 +55,17 @@ public BaseYdbStatement(Logger logger, YdbConnection connection, int resultSetTy YdbOperationProperties props = connection.getCtx().getOperationProperties(); this.queryTimeout = (int) props.getQueryTimeout().getSeconds(); - this.maxRows = props.getMaxRows(); this.scanQueryTxMode = props.getScanQueryTxMode(); this.schemeQueryTxMode = props.getSchemeQueryTxMode(); this.bulkQueryTxMode = props.getBulkQueryTxMode(); } - private void ensureOpened() throws SQLException { + private void prepareNewExecution() throws SQLException { + if (fetchSize > 0 && (fetchDirection != ResultSet.FETCH_FORWARD && fetchDirection != ResultSet.FETCH_UNKNOWN)) { + throw new SQLException(YdbConst.RESULT_IS_NOT_SCROLLABLE); + } connection.getExecutor().ensureOpened(); + connection.getExecutor().clearState(); } @Override @@ -73,7 +81,8 @@ public YdbConnection getConnection() { @Override public void close() throws SQLException { clearBatch(); - state = YdbQueryResult.EMPTY; + connection.getExecutor().clearState(); + state = EMPTY_RESULT; isClosed = true; } @@ -89,7 +98,7 @@ public int getResultSetType() { @Override public SQLWarning getWarnings() throws SQLException { - ensureOpened(); + prepareNewExecution(); return validator.toSQLWarnings(); } @@ -125,7 +134,7 @@ public int getMaxRows() { @Override public void setMaxRows(int max) { - // has not effect + this.maxRows = max; } @Override @@ -144,17 +153,17 @@ public int getUpdateCount() throws SQLException { } protected void cleanState() throws SQLException { - state = YdbQueryResult.EMPTY; + state = EMPTY_RESULT; clearWarnings(); } protected boolean updateState(YdbQueryResult result) throws SQLException { - state = result == null ? YdbQueryResult.EMPTY : result; + state = result == null ? EMPTY_RESULT : result; return state.hasResultSets(); } protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, ListValue rows) throws SQLException { - ensureOpened(); + prepareNewExecution(); if (connection.getExecutor().isInsideTransaction()) { switch (bulkQueryTxMode) { @@ -173,19 +182,19 @@ protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, Lis } protected YdbQueryResult executeExplainQuery(YdbQuery query) throws SQLException { - ensureOpened(); + prepareNewExecution(); return connection.getExecutor().executeExplainQuery(this, query); } protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params params) throws SQLException { - ensureOpened(); + prepareNewExecution(); YdbContext ctx = connection.getCtx(); if (ctx.isFullScanDetectorEnabled()) { if (QueryStat.isPrint(yql)) { ResultSetReader rsr = QueryStat.toResultSetReader(ctx.getFullScanDetectorStats()); - YdbResultSet rs = new YdbStaticResultSet(ctx.getTypes(), this, rsr); - return new StaticQueryResult(query, Collections.singletonList(rs)); + YdbResultSet rs = new YdbResultSetMemory(ctx.getTypes(), this, rsr); + return new YdbQueryResultStatic(query, rs); } if (QueryStat.isReset(yql)) { ctx.resetFullScanDetector(); @@ -194,11 +203,11 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par } ctx.traceQueryByFullScanDetector(query, yql); - return connection.getExecutor().executeDataQuery(this, query, yql, params, getQueryTimeout(), isPoolable()); + return connection.getExecutor().executeDataQuery(this, query, yql, params); } protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException { - ensureOpened(); + prepareNewExecution(); if (connection.getExecutor().isInsideTransaction()) { switch (schemeQueryTxMode) { @@ -217,7 +226,7 @@ protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException } protected YdbQueryResult executeScanQuery(YdbQuery query, String yql, Params params) throws SQLException { - ensureOpened(); + prepareNewExecution(); if (connection.getExecutor().isInsideTransaction()) { switch (scanQueryTxMode) { @@ -288,24 +297,22 @@ public boolean getMoreResults() throws SQLException { @Override public void setFetchDirection(int direction) throws SQLException { - if (direction != ResultSet.FETCH_FORWARD && direction != ResultSet.FETCH_UNKNOWN) { - throw new SQLException(YdbConst.DIRECTION_UNSUPPORTED + direction); - } + this.fetchDirection = direction; } @Override public int getFetchDirection() { - return ResultSet.FETCH_FORWARD; + return fetchDirection; } @Override public void setFetchSize(int rows) { - // has not effect + this.fetchSize = rows; } @Override public int getFetchSize() { - return getMaxRows(); + return fetchSize; } @Override diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java index 58098b7..eb482d2 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java @@ -11,11 +11,12 @@ import tech.ydb.jdbc.YdbConnection; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.query.Params; -public class YdbStatementImpl extends BaseYdbStatement { +public class YdbStatementImpl extends YdbStatementBase { private static final Logger LOGGER = Logger.getLogger(YdbStatementImpl.class.getName()); private final List batch = new ArrayList<>(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 7ad1cd6..38c4c12 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -70,8 +70,6 @@ public class YdbOperationProperties { static final YdbProperty QUERY_REWRITE_TABLE_TTL = YdbProperty.duration("queryRewriteTtl", "Name of working table to hot replacemnt of queies", "300s"); - private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? - private final YdbValue joinDuration; private final YdbValue queryTimeout; private final YdbValue scanQueryTimeout; @@ -166,10 +164,6 @@ public boolean getForceNewDatetypes() { return forceNewDatetypes.getValue(); } - public int getMaxRows() { - return MAX_ROWS; - } - public String getTxValidationTable() { return txValidationTable.getValue(); } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java index f73c765..37c3246 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java @@ -900,8 +900,9 @@ public void testBigBulkAndScan() throws SQLException { ps.executeBatch(); } - // SCAN all table + // Read whole table try (PreparedStatement ps = conn.prepareStatement(selectAll)) { + ps.setFetchSize(1000); // lazy reading int readed = 0; Assertions.assertTrue(ps.execute()); try (ResultSet rs = ps.getResultSet()) { @@ -914,11 +915,17 @@ public void testBigBulkAndScan() throws SQLException { Assertions.assertEquals(10000, readed); } - // Canceled scan + // Canceled lazy reading try (PreparedStatement ps = conn.prepareStatement(selectAll)) { + ps.setFetchSize(1000); // lazy reading + Assertions.assertTrue(ps.execute()); - ps.getResultSet().next(); - ps.getResultSet().close(); + + try (ResultSet rs = ps.getResultSet()) { + Assertions.assertTrue(rs.next()); + Assertions.assertNull(rs.getWarnings()); + // after ResultSet reading stream will be canceled + } SQLWarning w = ps.getWarnings(); Assertions.assertNotNull(w); diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbResultSetImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbResultSetImplTest.java index 51c8a31..3425fb5 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbResultSetImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbResultSetImplTest.java @@ -25,7 +25,6 @@ import java.time.Month; import java.time.ZoneOffset; import java.util.Collections; -import java.util.Locale; import java.util.UUID; import javax.sql.rowset.serial.SerialBlob; @@ -34,13 +33,13 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CharStreams; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbResultSetMetaData; @@ -59,509 +58,685 @@ import tech.ydb.test.junit5.YdbHelperExtension; public class YdbResultSetImplTest { + @RegisterExtension private static final YdbHelperExtension ydb = new YdbHelperExtension(); @RegisterExtension - private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb); - - private static final SqlQueries TEST_TABLE = new SqlQueries("ydb_result_set_test"); - - static { - Locale.setDefault(Locale.US); - } + private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb) + .withArg("usePrefixPath", "rs") + .withArg("useStreamResultSets", "true"); - private Statement statement; - private ResultSet resultSet; + private static final SqlQueries SMALL = new SqlQueries("small_table"); + private static final SqlQueries BIG = new SqlQueries("big_table"); @BeforeAll - public static void initTable() throws SQLException { + public static void initTables() throws SQLException { try (Statement statement = jdbc.connection().createStatement();) { - // create test table - statement.execute(TEST_TABLE.createTableSQL()); - statement.execute(TEST_TABLE.initTableSQL()); + // create test tables + statement.execute(SMALL.createTableSQL()); + statement.execute(SMALL.initTableSQL()); + statement.execute(BIG.createTableSQL()); + } + + // BULK UPSERT + String bulkUpsertQuery = BIG.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", null); + try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsertQuery)) { + for (int idx = 1; idx <= 10000; idx++) { + ps.setInt(1, idx); + ps.setString(2, "value-" + idx); + ps.addBatch(); + } + ps.executeBatch(); } - jdbc.connection().commit(); } @AfterAll public static void dropTable() throws SQLException { try (Statement statement = jdbc.connection().createStatement();) { - statement.execute(TEST_TABLE.dropTableSQL()); + statement.execute(SMALL.dropTableSQL()); + statement.execute(BIG.dropTableSQL()); } - jdbc.connection().commit(); } - @BeforeEach - public void beforeEach() throws SQLException { - statement = jdbc.connection().createStatement(); - resultSet = statement.executeQuery(TEST_TABLE.selectSQL()); + private static ResultSet selectSmall() throws SQLException { + return jdbc.connection().createStatement().executeQuery(SMALL.selectSQL()); + } + + private static void assertForwardOnly(Executable exec) { + ExceptionAssert.sqlFeatureNotSupported("ResultSet in TYPE_FORWARD_ONLY mode", exec); + } + + private static void assertCursorUpdates(Executable exec) { + ExceptionAssert.sqlFeatureNotSupported("Cursor updates are not supported", exec); } - @AfterEach - public void afterEach() throws SQLException { - resultSet.close(); - statement.close(); + private static void assertUpdateObject(Executable exec) { + ExceptionAssert.sqlFeatureNotSupported("updateObject not implemented", exec); + } - jdbc.connection().commit();// TODO: conection must be cleaned + private void assertIsEmpty(ResultSet rs) throws SQLException { + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isAfterLast()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertEquals(0, rs.getRow()); } @Test public void resultSetType() throws SQLException { - Assertions.assertEquals(ResultSet.TYPE_SCROLL_INSENSITIVE, resultSet.getType()); - Assertions.assertEquals(ResultSet.CONCUR_READ_ONLY, resultSet.getConcurrency()); + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.TYPE_SCROLL_INSENSITIVE, rs.getType()); + Assertions.assertEquals(ResultSet.CONCUR_READ_ONLY, rs.getConcurrency()); + Assertions.assertEquals(0, rs.getFetchSize()); + Assertions.assertEquals(ResultSet.FETCH_UNKNOWN, rs.getFetchDirection()); + + Assertions.assertNotNull(rs.getStatement()); + Assertions.assertSame(st, rs.getStatement()); + + Assertions.assertEquals(ResultSet.HOLD_CURSORS_OVER_COMMIT, rs.getHoldability()); + } + + st.setFetchSize(1000); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.TYPE_SCROLL_INSENSITIVE, rs.getType()); + Assertions.assertEquals(ResultSet.CONCUR_READ_ONLY, rs.getConcurrency()); + Assertions.assertEquals(1000, rs.getFetchSize()); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); - Assertions.assertNotNull(resultSet.getStatement()); - Assertions.assertSame(statement, resultSet.getStatement()); + Assertions.assertNotNull(rs.getStatement()); + Assertions.assertSame(st, rs.getStatement()); - Assertions.assertEquals(ResultSet.HOLD_CURSORS_OVER_COMMIT, resultSet.getHoldability()); + Assertions.assertEquals(ResultSet.HOLD_CURSORS_OVER_COMMIT, rs.getHoldability()); + } + } } @Test public void close() throws SQLException { - Assertions.assertFalse(resultSet.isClosed()); - resultSet.close(); - Assertions.assertTrue(resultSet.isClosed()); + try (Statement st = jdbc.connection().createStatement()) { + ResultSet rs = st.executeQuery(SMALL.selectSQL()); + + Assertions.assertFalse(rs.isClosed()); + rs.close(); + Assertions.assertTrue(rs.isClosed()); + rs.close(); // double closing is allowed + Assertions.assertTrue(rs.isClosed()); + } + + Statement st = jdbc.connection().createStatement(); + ResultSet rs = st.executeQuery(SMALL.selectSQL()); + Assertions.assertFalse(rs.isClosed()); + st.close(); // statement closes current result set + Assertions.assertTrue(rs.isClosed()); + Assertions.assertTrue(st.isClosed()); } @Test public void unwrap() throws SQLException { - Assertions.assertTrue(resultSet.isWrapperFor(YdbResultSet.class)); - Assertions.assertSame(resultSet, resultSet.unwrap(YdbResultSet.class)); + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertTrue(rs.isWrapperFor(YdbResultSet.class)); + Assertions.assertSame(rs, rs.unwrap(YdbResultSet.class)); + + Assertions.assertFalse(rs.isWrapperFor(YdbStatement.class)); + ExceptionAssert.sqlException("Cannot unwrap to " + YdbStatement.class, () -> rs.unwrap(YdbStatement.class)); + } + + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertTrue(rs.isWrapperFor(YdbResultSetMemory.class)); + Assertions.assertSame(rs, rs.unwrap(YdbResultSetMemory.class)); + Assertions.assertEquals(0, rs.getFetchSize()); + Assertions.assertEquals(ResultSet.FETCH_UNKNOWN, rs.getFetchDirection()); + } - Assertions.assertFalse(resultSet.isWrapperFor(YdbStatement.class)); - ExceptionAssert.sqlException("Cannot unwrap to " + YdbStatement.class, - () -> resultSet.unwrap(YdbStatement.class)); + st.setFetchSize(1000); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertTrue(rs.isWrapperFor(YdbResultSetForwardOnly.class)); + Assertions.assertSame(rs, rs.unwrap(YdbResultSetForwardOnly.class)); + Assertions.assertEquals(1000, rs.getFetchSize()); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + } + } } @Test - public void invalidColumnsTest() { - // invalid location - ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> resultSet.getString(2)); - ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> resultSet.getString("c_Text")); - - // invalid case - ExceptionAssert.sqlException("Column not found: c_text", () -> resultSet.getString("c_text")); - ExceptionAssert.sqlException("Column not found: C_TEXT", () -> resultSet.getString("C_TEXT")); - ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> resultSet.getString("c_Text")); - - // invalid name - ExceptionAssert.sqlException("Column not found: value0", () -> resultSet.getString("value0")); + public void invalidColumnsTest() throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + // invalid location + ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> rs.getString(2)); + ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> rs.getString("c_Text")); + + // invalid case + ExceptionAssert.sqlException("Column not found: c_text", () -> rs.getString("c_text")); + ExceptionAssert.sqlException("Column not found: C_TEXT", () -> rs.getString("C_TEXT")); + ExceptionAssert.sqlException("Current row index is out of bounds: 0", () -> rs.getString("c_Text")); + + // invalid name + ExceptionAssert.sqlException("Column not found: value0", () -> rs.getString("value0")); + } + } } @Test public void findColumn() throws SQLException { - Assertions.assertEquals(1, resultSet.findColumn("key")); - Assertions.assertEquals(14, resultSet.findColumn("c_Text")); - ExceptionAssert.sqlException("Column not found: value0", () -> resultSet.findColumn("value0")); + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(1, rs.findColumn("key")); + Assertions.assertEquals(14, rs.findColumn("c_Text")); + ExceptionAssert.sqlException("Column not found: value0", () -> rs.findColumn("value0")); + } + } } @Test public void warnings() throws SQLException { - Assertions.assertNull(resultSet.getWarnings()); - resultSet.clearWarnings(); - Assertions.assertNull(resultSet.getWarnings()); + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery("$ignored = 1; " + SMALL.selectSQL())) { + Assertions.assertNull(rs.getWarnings()); + rs.clearWarnings(); + Assertions.assertNull(rs.getWarnings()); + } + } } @Test public void getCursorName() { - ExceptionAssert.sqlFeatureNotSupported("Named cursors are not supported", () -> resultSet.getCursorName()); + ExceptionAssert.sqlFeatureNotSupported("Named cursors are not supported", () -> selectSmall().getCursorName()); } @Test public void getMetaData() throws SQLException { - ResultSetMetaData metadata = resultSet.getMetaData(); - Assertions.assertSame(metadata, resultSet.getMetaData(), "Metadata is cached"); - - Assertions.assertTrue(metadata.isWrapperFor(YdbResultSetMetaData.class)); - YdbResultSetMetaData ydbMetadata = metadata.unwrap(YdbResultSetMetaData.class); - Assertions.assertSame(metadata, ydbMetadata); - - ExceptionAssert.sqlException("Column is out of range: 995", () -> metadata.getColumnName(995)); - - Assertions.assertEquals(29, metadata.getColumnCount()); - - for (int index = 0; index < metadata.getColumnCount(); index++) { - int column = index + 1; - String name = metadata.getColumnName(column); - Assertions.assertNotNull(name); - Assertions.assertEquals(name, metadata.getColumnLabel(column)); - - Assertions.assertFalse(metadata.isAutoIncrement(column), "All columns are not isAutoIncrement"); - Assertions.assertTrue(metadata.isCaseSensitive(column), "All columns are isCaseSensitive"); - Assertions.assertFalse(metadata.isSearchable(column), "All columns are not isSearchable"); - Assertions.assertFalse(metadata.isCurrency(column), "All columns are not isCurrency"); - Assertions.assertEquals(ResultSetMetaData.columnNullable, metadata.isNullable(column), - "All columns in table are nullable, but pseudo-columns are not"); - Assertions.assertFalse(metadata.isSigned(column), "All columns are not isSigned"); - Assertions.assertEquals(0, metadata.getColumnDisplaySize(column), "No display size available"); - Assertions.assertEquals("", metadata.getSchemaName(column), "No schema available"); - Assertions.assertEquals(0, metadata.getPrecision(column), "No precision available"); - Assertions.assertEquals(0, metadata.getScale(column), "No scale available"); - Assertions.assertEquals("", metadata.getTableName(column), "No table name available"); - Assertions.assertEquals("", metadata.getCatalogName(column), "No catalog name available"); - Assertions.assertTrue(metadata.isReadOnly(column), "All columns are isReadOnly"); - Assertions.assertFalse(metadata.isWritable(column), "All columns are not isWritable"); - Assertions.assertFalse(metadata.isDefinitelyWritable(column), "All columns are not isDefinitelyWritable"); - - if (name.startsWith("c_")) { - String expectType = name.substring("c_".length()).toLowerCase(); - if (expectType.equals("decimal")) { - expectType = "decimal(22, 9)"; + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + ResultSetMetaData metadata = rs.getMetaData(); + Assertions.assertSame(metadata, rs.getMetaData(), "Metadata is cached"); + + Assertions.assertTrue(metadata.isWrapperFor(YdbResultSetMetaData.class)); + YdbResultSetMetaData ydbMetadata = metadata.unwrap(YdbResultSetMetaData.class); + Assertions.assertSame(metadata, ydbMetadata); + + ExceptionAssert.sqlException("Column is out of range: 995", () -> metadata.getColumnName(995)); + + Assertions.assertEquals(29, metadata.getColumnCount()); + + for (int index = 0; index < metadata.getColumnCount(); index++) { + int column = index + 1; + String name = metadata.getColumnName(column); + Assertions.assertNotNull(name); + Assertions.assertEquals(name, metadata.getColumnLabel(column)); + + Assertions.assertFalse(metadata.isAutoIncrement(column), "All columns are not isAutoIncrement"); + Assertions.assertTrue(metadata.isCaseSensitive(column), "All columns are isCaseSensitive"); + Assertions.assertFalse(metadata.isSearchable(column), "All columns are not isSearchable"); + Assertions.assertFalse(metadata.isCurrency(column), "All columns are not isCurrency"); + Assertions.assertEquals(ResultSetMetaData.columnNullable, metadata.isNullable(column), + "All columns in table are nullable, but pseudo-columns are not"); + Assertions.assertFalse(metadata.isSigned(column), "All columns are not isSigned"); + Assertions.assertEquals(0, metadata.getColumnDisplaySize(column), "No display size available"); + Assertions.assertEquals("", metadata.getSchemaName(column), "No schema available"); + Assertions.assertEquals(0, metadata.getPrecision(column), "No precision available"); + Assertions.assertEquals(0, metadata.getScale(column), "No scale available"); + Assertions.assertEquals("", metadata.getTableName(column), "No table name available"); + Assertions.assertEquals("", metadata.getCatalogName(column), "No catalog name available"); + Assertions.assertTrue(metadata.isReadOnly(column), "All columns are isReadOnly"); + Assertions.assertFalse(metadata.isWritable(column), "All columns are not isWritable"); + Assertions.assertFalse(metadata.isDefinitelyWritable(column), + "All columns are not isDefinitelyWritable"); + + if (name.startsWith("c_")) { + String expectType = name.substring("c_".length()).toLowerCase(); + if (expectType.equals("decimal")) { + expectType = "decimal(22, 9)"; + } + if (expectType.equals("bigdecimal")) { + expectType = "decimal(35, 0)"; + } + if (expectType.equals("bankdecimal")) { + expectType = "decimal(31, 9)"; + } + + String actualType = metadata.getColumnTypeName(column); + Assertions.assertNotNull(actualType, "All columns have database types"); + Assertions.assertEquals(expectType, actualType.toLowerCase(), + "All column names are similar to types"); + } + + Assertions.assertTrue(metadata.getColumnType(column) != 0, + "All columns have sql type, including " + name); + // getColumnClassName is checkering already } - if (expectType.equals("bigdecimal")) { - expectType = "decimal(35, 0)"; - } - if (expectType.equals("bankdecimal")) { - expectType = "decimal(31, 9)"; - } - - String actualType = metadata.getColumnTypeName(column); - Assertions.assertNotNull(actualType, "All columns have database types"); - Assertions.assertEquals(expectType, actualType.toLowerCase(), - "All column names are similar to types"); } - - Assertions.assertTrue(metadata.getColumnType(column) != 0, - "All columns have sql type, including " + name); - // getColumnClassName is checkering already } } @Test public void fetchDirection() throws SQLException { - Assertions.assertEquals(ResultSet.FETCH_UNKNOWN, resultSet.getFetchDirection()); - resultSet.setFetchDirection(ResultSet.FETCH_REVERSE); - Assertions.assertEquals(ResultSet.FETCH_REVERSE, resultSet.getFetchDirection()); - } + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + st.setFetchDirection(ResultSet.FETCH_UNKNOWN); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.FETCH_UNKNOWN, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_REVERSE); + Assertions.assertEquals(ResultSet.FETCH_REVERSE, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_FORWARD); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + } - @Test - public void fetchSize() throws SQLException { - Assertions.assertEquals(1000, resultSet.getFetchSize()); - resultSet.setFetchSize(99); // do nothing - Assertions.assertEquals(1000, resultSet.getFetchSize()); - } + st.setFetchSize(0); + st.setFetchDirection(ResultSet.FETCH_REVERSE); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.FETCH_REVERSE, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_UNKNOWN); + Assertions.assertEquals(ResultSet.FETCH_UNKNOWN, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_FORWARD); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + } - private void assertIsEmpty(ResultSet rs) throws SQLException { - Assertions.assertFalse(rs.isBeforeFirst()); - Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertFalse(rs.isFirst()); - Assertions.assertFalse(rs.isLast()); - Assertions.assertEquals(0, rs.getRow()); + st.setFetchSize(1000); + st.setFetchDirection(ResultSet.FETCH_UNKNOWN); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_FORWARD); + assertForwardOnly(() -> rs.setFetchDirection(ResultSet.FETCH_REVERSE)); + assertForwardOnly(() -> rs.setFetchDirection(ResultSet.FETCH_UNKNOWN)); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + } + + st.setFetchSize(1000); + st.setFetchDirection(ResultSet.FETCH_FORWARD); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + rs.setFetchDirection(ResultSet.FETCH_FORWARD); + assertForwardOnly(() -> rs.setFetchDirection(ResultSet.FETCH_REVERSE)); + assertForwardOnly(() -> rs.setFetchDirection(ResultSet.FETCH_UNKNOWN)); + Assertions.assertEquals(ResultSet.FETCH_FORWARD, rs.getFetchDirection()); + } + + st.setFetchSize(1000); + st.setFetchDirection(ResultSet.FETCH_REVERSE); + ExceptionAssert.sqlException("Requested scrollable ResutlSet, but this ResultSet is FORWARD_ONLY.", + () -> st.executeQuery(SMALL.selectSQL()) + ); + } } @Test public void moveOnEmptyResultSet() throws SQLException { - String select = TEST_TABLE.withTableName("select * from #tableName where 1 = 0"); - try (ResultSet rs = statement.executeQuery(select)) { - assertIsEmpty(rs); - rs.beforeFirst(); - assertIsEmpty(rs); - - rs.afterLast(); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.next()); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.previous()); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.first()); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.last()); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.absolute(0)); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.absolute(1)); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.absolute(-1)); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.relative(0)); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.relative(1)); - assertIsEmpty(rs); - - Assertions.assertFalse(rs.relative(-1)); - assertIsEmpty(rs); + String select = SMALL.withTableName("scan select * from #tableName where 1 = 0"); + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery(select)) { + assertIsEmpty(rs); + Assertions.assertFalse(rs.next()); + assertIsEmpty(rs); + } } } @Test public void closeResultSetOnExecuteNext() throws SQLException { - ResultSet rs1 = statement.executeQuery(TEST_TABLE.selectSQL()); - Assertions.assertFalse(rs1.isClosed()); - ResultSet rs2 = statement.executeQuery(TEST_TABLE.selectSQL()); - Assertions.assertTrue(rs1.isClosed()); - Assertions.assertFalse(rs2.isClosed()); - - rs2.close(); + try (Statement st = jdbc.connection().createStatement()) { + ResultSet rs1 = st.executeQuery(SMALL.selectSQL()); + Assertions.assertFalse(rs1.isClosed()); + ResultSet rs2 = st.executeQuery(SMALL.selectSQL()); + Assertions.assertTrue(rs1.isClosed()); + Assertions.assertFalse(rs2.isClosed()); + + rs2.close(); + } } @Test public void closeResultSetOnCreateStatement() throws SQLException { - ResultSet rs1 = statement.executeQuery(TEST_TABLE.selectSQL()); - Assertions.assertFalse(rs1.isClosed()); + try (Statement st = jdbc.connection().createStatement()) { + ResultSet rs1 = st.executeQuery(SMALL.selectSQL()); + Assertions.assertFalse(rs1.isClosed()); - Statement other = jdbc.connection().createStatement(); + Statement other = jdbc.connection().createStatement(); - Assertions.assertFalse(rs1.isClosed()); // new statement doesn't close current result set + Assertions.assertFalse(rs1.isClosed()); // new statement doesn't close current result set - ResultSet rs2 = other.executeQuery(TEST_TABLE.selectSQL()); - Assertions.assertTrue(rs1.isClosed()); - Assertions.assertFalse(rs2.isClosed()); + ResultSet rs2 = other.executeQuery(SMALL.selectSQL()); + Assertions.assertTrue(rs1.isClosed()); + Assertions.assertFalse(rs2.isClosed()); - other.close(); - Assertions.assertFalse(rs2.isClosed()); + other.close(); + Assertions.assertTrue(rs2.isClosed()); - rs2.close(); + rs2.close(); + } } @Test public void closeResultSetOnPrepareStatement() throws SQLException { - ResultSet rs1 = statement.executeQuery(TEST_TABLE.selectSQL()); - Assertions.assertFalse(rs1.isClosed()); + try (Statement st = jdbc.connection().createStatement()) { + ResultSet rs1 = st.executeQuery(SMALL.selectSQL()); + Assertions.assertFalse(rs1.isClosed()); - PreparedStatement ps = jdbc.connection().prepareStatement(TEST_TABLE.selectAllByKey("?")); + PreparedStatement ps = jdbc.connection().prepareStatement(SMALL.selectAllByKey("?")); - Assertions.assertFalse(rs1.isClosed()); // prepare statement doesn't close current result set - ps.setInt(1, 1); + Assertions.assertFalse(rs1.isClosed()); // prepare statement doesn't close current result set + ps.setInt(1, 1); - Assertions.assertFalse(rs1.isClosed()); // prepare statement doesn't close current result set + Assertions.assertFalse(rs1.isClosed()); // prepare statement doesn't close current result set - ResultSet rs2 = ps.executeQuery(); - Assertions.assertTrue(rs1.isClosed()); - Assertions.assertFalse(rs2.isClosed()); + ResultSet rs2 = ps.executeQuery(); + Assertions.assertTrue(rs1.isClosed()); + Assertions.assertFalse(rs2.isClosed()); - ps.close(); - Assertions.assertFalse(rs2.isClosed()); + ps.close(); + Assertions.assertTrue(rs2.isClosed()); - rs2.close(); + rs2.close(); + } } - @Test - public void next() throws SQLException { - Assertions.assertEquals(0, resultSet.getRow()); - - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(1, resultSet.getRow()); + @ParameterizedTest + @ValueSource(ints = { -1, 0, 1000 }) + public void next(int fetchSize) throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(fetchSize); + ResultSet rs = st.executeQuery(SMALL.selectSQL()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(2, resultSet.getRow()); + Assertions.assertEquals(0, rs.getRow()); + Assertions.assertTrue(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(3, resultSet.getRow()); + Assertions.assertTrue(rs.next()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertEquals(1, rs.getRow()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertTrue(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.next()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertFalse(resultSet.next()); + Assertions.assertEquals(2, rs.getRow()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertEquals(6, resultSet.getRow()); - } - - @Test - public void first() throws SQLException { - Assertions.assertFalse(resultSet.isFirst()); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertTrue(rs.next()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertTrue(resultSet.isFirst()); - Assertions.assertEquals(1, resultSet.getRow()); + Assertions.assertEquals(3, rs.getRow()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertFalse(resultSet.isFirst()); - Assertions.assertEquals(2, resultSet.getRow()); - } + Assertions.assertTrue(rs.next()); - @Test - public void last() throws SQLException { - Assertions.assertFalse(resultSet.isLast()); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertEquals(4, rs.getRow()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertTrue(resultSet.last()); - Assertions.assertTrue(resultSet.isLast()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.next()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertFalse(resultSet.isLast()); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertEquals(5, rs.getRow()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertTrue(rs.isLast()); + Assertions.assertFalse(rs.isAfterLast()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertTrue(resultSet.isLast()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertFalse(rs.next()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertFalse(resultSet.isLast()); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertEquals(0, rs.getRow()); // like Postgres + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertTrue(rs.isAfterLast()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.next()); + Assertions.assertFalse(rs.next()); + } } @Test - public void beforeFirst() throws SQLException { - Assertions.assertTrue(resultSet.isBeforeFirst()); - Assertions.assertEquals(0, resultSet.getRow()); + public void forwarnOnlyUnsupportedMethods() throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(100); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(0, rs.getRow()); + assertForwardOnly(() -> { rs.first(); }); + assertForwardOnly(() -> { rs.last(); }); + assertForwardOnly(() -> { rs.previous(); }); + assertForwardOnly(() -> { rs.beforeFirst(); }); + assertForwardOnly(() -> { rs.afterLast(); }); + assertForwardOnly(() -> { rs.absolute(0); }); + assertForwardOnly(() -> { rs.relative(0); }); + } + } + } - Assertions.assertTrue(resultSet.next()); - Assertions.assertFalse(resultSet.isBeforeFirst()); - Assertions.assertEquals(1, resultSet.getRow()); + @Test + public void first() throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertFalse(rs.isFirst()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertTrue(rs.next()); + Assertions.assertTrue(rs.isFirst()); + Assertions.assertEquals(1, rs.getRow()); + + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.isFirst()); + Assertions.assertEquals(2, rs.getRow()); + } + } + } - resultSet.beforeFirst(); - Assertions.assertTrue(resultSet.isBeforeFirst()); - Assertions.assertEquals(0, resultSet.getRow()); + @Test + public void last() throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertFalse(rs.isLast()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertTrue(rs.last()); + Assertions.assertTrue(rs.isLast()); + Assertions.assertEquals(5, rs.getRow()); + + Assertions.assertTrue(rs.previous()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertEquals(4, rs.getRow()); + + Assertions.assertTrue(rs.next()); + Assertions.assertTrue(rs.isLast()); + Assertions.assertEquals(5, rs.getRow()); + + Assertions.assertFalse(rs.next()); + Assertions.assertFalse(rs.isLast()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertFalse(rs.next()); + Assertions.assertEquals(0, rs.getRow()); + } + } + } - Assertions.assertTrue(resultSet.next()); - Assertions.assertFalse(resultSet.isBeforeFirst()); - Assertions.assertEquals(1, resultSet.getRow()); + @Test + public void beforeFirst() throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertTrue(rs.isBeforeFirst()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertEquals(1, rs.getRow()); + + rs.beforeFirst(); + Assertions.assertTrue(rs.isBeforeFirst()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.isBeforeFirst()); + Assertions.assertEquals(1, rs.getRow()); + } + } } @Test public void afterLast() throws SQLException { - Assertions.assertFalse(resultSet.isAfterLast()); - Assertions.assertEquals(0, resultSet.getRow()); - - resultSet.afterLast(); - Assertions.assertTrue(resultSet.isAfterLast()); - Assertions.assertEquals(6, resultSet.getRow()); - - Assertions.assertFalse(resultSet.next()); - Assertions.assertTrue(resultSet.isAfterLast()); - Assertions.assertEquals(6, resultSet.getRow()); - - Assertions.assertTrue(resultSet.previous()); - Assertions.assertFalse(resultSet.isAfterLast()); - Assertions.assertEquals(5, resultSet.getRow()); - - Assertions.assertFalse(resultSet.next()); - Assertions.assertTrue(resultSet.isAfterLast()); - Assertions.assertEquals(6, resultSet.getRow()); + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertFalse(rs.isAfterLast()); + Assertions.assertEquals(0, rs.getRow()); + + rs.afterLast(); + Assertions.assertTrue(rs.isAfterLast()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertFalse(rs.next()); + Assertions.assertTrue(rs.isAfterLast()); + Assertions.assertEquals(0, rs.getRow()); + + Assertions.assertTrue(rs.previous()); + Assertions.assertFalse(rs.isAfterLast()); + Assertions.assertEquals(5, rs.getRow()); + + Assertions.assertFalse(rs.next()); + Assertions.assertTrue(rs.isAfterLast()); + Assertions.assertEquals(0, rs.getRow()); + } + } } @Test public void absolute() throws SQLException { - Assertions.assertEquals(0, resultSet.getRow()); + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.absolute(0)); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertFalse(rs.absolute(0)); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertTrue(resultSet.absolute(1)); - Assertions.assertEquals(1, resultSet.getRow()); + Assertions.assertTrue(rs.absolute(1)); + Assertions.assertEquals(1, rs.getRow()); - Assertions.assertTrue(resultSet.absolute(-1)); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.absolute(-1)); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertTrue(resultSet.absolute(-2)); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertTrue(rs.absolute(-2)); + Assertions.assertEquals(4, rs.getRow()); - Assertions.assertTrue(resultSet.absolute(4)); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertTrue(rs.absolute(4)); + Assertions.assertEquals(4, rs.getRow()); - Assertions.assertTrue(resultSet.absolute(5)); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.absolute(5)); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertFalse(resultSet.absolute(6)); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.absolute(6)); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.absolute(7)); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.absolute(7)); + Assertions.assertEquals(0, rs.getRow()); + } + } } @Test public void relative() throws SQLException { - Assertions.assertEquals(0, resultSet.getRow()); + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertTrue(resultSet.relative(1)); - Assertions.assertEquals(1, resultSet.getRow()); + Assertions.assertTrue(rs.relative(1)); + Assertions.assertEquals(1, rs.getRow()); - Assertions.assertTrue(resultSet.relative(2)); - Assertions.assertEquals(3, resultSet.getRow()); + Assertions.assertTrue(rs.relative(2)); + Assertions.assertEquals(3, rs.getRow()); - Assertions.assertTrue(resultSet.relative(0)); - Assertions.assertEquals(3, resultSet.getRow()); + Assertions.assertTrue(rs.relative(0)); + Assertions.assertEquals(3, rs.getRow()); - Assertions.assertFalse(resultSet.relative(3)); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.relative(3)); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.relative(2)); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.relative(2)); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertTrue(resultSet.relative(-1)); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.relative(-1)); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertTrue(resultSet.relative(-1)); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertTrue(rs.relative(-1)); + Assertions.assertEquals(4, rs.getRow()); - Assertions.assertFalse(resultSet.relative(-10)); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertFalse(rs.relative(-10)); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.relative(-1)); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertFalse(rs.relative(-1)); + Assertions.assertEquals(0, rs.getRow()); + } + } } @Test public void previous() throws SQLException { - Assertions.assertEquals(0, resultSet.getRow()); + try (Statement st = jdbc.connection().createStatement()) { + st.setFetchSize(0); + try (ResultSet rs = st.executeQuery(SMALL.selectSQL())) { + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertTrue(resultSet.last()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.last()); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(4, rs.getRow()); - Assertions.assertTrue(resultSet.next()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.next()); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.next()); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.next()); - Assertions.assertEquals(6, resultSet.getRow()); + Assertions.assertFalse(rs.next()); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(5, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(5, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(4, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(4, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(3, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(3, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(2, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(2, rs.getRow()); - Assertions.assertTrue(resultSet.previous()); - Assertions.assertEquals(1, resultSet.getRow()); + Assertions.assertTrue(rs.previous()); + Assertions.assertEquals(1, rs.getRow()); - Assertions.assertFalse(resultSet.previous()); - Assertions.assertEquals(0, resultSet.getRow()); + Assertions.assertFalse(rs.previous()); + Assertions.assertEquals(0, rs.getRow()); - Assertions.assertFalse(resultSet.previous()); - Assertions.assertEquals(0, resultSet.getRow()); - } - - private void assertCursorUpdates(Executable exec) { - ExceptionAssert.sqlFeatureNotSupported("Cursor updates are not supported", exec); - } - - private void assertUpdateObject(Executable exec) { - ExceptionAssert.sqlFeatureNotSupported("updateObject not implemented", exec); + Assertions.assertFalse(rs.previous()); + Assertions.assertEquals(0, rs.getRow()); + } + } } @Test - public void cursorUpdatesIsNotSupportedTest() { + public void cursorUpdatesIsNotSupportedTest() throws SQLException { + ResultSet resultSet = selectSmall(); // row actions assertCursorUpdates(() -> resultSet.rowUpdated()); assertCursorUpdates(() -> resultSet.rowInserted()); @@ -719,7 +894,7 @@ public void cursorUpdatesIsNotSupportedTest() { @Test public void getString() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getString, ResultSet::getString); + ResultSetChecker checker = check(selectSmall(), ResultSet::getString, ResultSet::getString); checker.nextRow() .value(1, "key", "1") @@ -861,7 +1036,7 @@ public void getString() throws SQLException { @Test public void getBoolean() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getBoolean, ResultSet::getBoolean); + ResultSetChecker checker = check(selectSmall(), ResultSet::getBoolean, ResultSet::getBoolean); checker.nextRow() .value(1, "key", true) @@ -927,7 +1102,7 @@ public void getBoolean() throws SQLException { @Test public void getByte() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getByte, ResultSet::getByte); + ResultSetChecker checker = check(selectSmall(), ResultSet::getByte, ResultSet::getByte); checker.nextRow() .value(1, "key", (byte) 1) @@ -1057,7 +1232,7 @@ public void getByte() throws SQLException { @Test public void getShort() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getShort, ResultSet::getShort); + ResultSetChecker checker = check(selectSmall(), ResultSet::getShort, ResultSet::getShort); checker.nextRow() .value(1, "key", (short) 1) @@ -1152,7 +1327,7 @@ public void getShort() throws SQLException { @Test public void getInt() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getInt, ResultSet::getInt); + ResultSetChecker checker = check(selectSmall(), ResultSet::getInt, ResultSet::getInt); checker.nextRow() .value(1, "key", 1) @@ -1315,7 +1490,7 @@ public void getInt() throws SQLException { @Test public void getLong() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getLong, ResultSet::getLong); + ResultSetChecker checker = check(selectSmall(), ResultSet::getLong, ResultSet::getLong); checker.nextRow() .value(1, "key", 1L) @@ -1477,7 +1652,7 @@ public void getLong() throws SQLException { @Test public void getFloat() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getFloat, ResultSet::getFloat); + ResultSetChecker checker = check(selectSmall(), ResultSet::getFloat, ResultSet::getFloat); checker.nextRow() .value(1, "key", 1f) @@ -1569,7 +1744,7 @@ public void getFloat() throws SQLException { @Test public void getDouble() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getDouble, ResultSet::getDouble); + ResultSetChecker checker = check(selectSmall(), ResultSet::getDouble, ResultSet::getDouble); checker.nextRow() .value(1, "key", 1d) @@ -1661,7 +1836,7 @@ public void getDouble() throws SQLException { @Test public void getBigDecimal() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getBigDecimal, ResultSet::getBigDecimal); + ResultSetChecker checker = check(selectSmall(), ResultSet::getBigDecimal, ResultSet::getBigDecimal); checker.nextRow() .value(1, "key", BigDecimal.valueOf(1)) @@ -1757,7 +1932,7 @@ private byte[] bytes(String string) { @Test public void getBytes() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getBytes, ResultSet::getBytes); + ResultSetChecker checker = check(selectSmall(), ResultSet::getBytes, ResultSet::getBytes); checker.nextRow() .value(13, "c_Bytes", bytes("bytes array")) @@ -1799,7 +1974,7 @@ public void getBytes() throws SQLException { @Test public void getDate() throws SQLException { - ResultSetChecker checker = check(resultSet, ResultSet::getDate, ResultSet::getDate); + ResultSetChecker checker = check(selectSmall(), ResultSet::getDate, ResultSet::getDate); checker.nextRow() .value(3, "c_Int8", Date.valueOf(LocalDate.ofEpochDay(101))) @@ -1898,7 +2073,7 @@ public void getDate() throws SQLException { @Test public void getTime() throws SQLException { - ResultSetChecker