diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/BaseDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/BaseDialect.java new file mode 100644 index 00000000..a31472ec --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/BaseDialect.java @@ -0,0 +1,166 @@ +package com.gruelbox.transactionoutbox; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Stream; + +public abstract class BaseDialect implements Dialect { + private final Map migrations = new TreeMap<>(); + + public BaseDialect() { + migrations.put( + 1, + new Migration( + 1, + "Create outbox table", + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR(36) PRIMARY KEY,\n" + + " invocation TEXT,\n" + + " nextAttemptTime TIMESTAMP(6),\n" + + " attempts INT,\n" + + " blacklisted BOOLEAN,\n" + + " version INT\n" + + ")")); + migrations.put( + 2, + new Migration( + 2, + "Add unique request id", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE")); + migrations.put( + 3, + new Migration( + 3, "Add processed flag", "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN")); + migrations.put( + 4, + new Migration( + 4, + "Add flush index", + "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)")); + migrations.put( + 5, + new Migration( + 5, + "Increase size of uniqueRequestId", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)")); + migrations.put( + 6, + new Migration( + 6, + "Rename column blacklisted to blocked", + "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)")); + migrations.put( + 7, + new Migration( + 7, + "Add lastAttemptTime column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation")); + migrations.put( + 8, + new Migration( + 8, + "Update length of invocation column on outbox for MySQL dialects only.", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); + migrations.put( + 9, + new Migration( + 9, + "Add topic", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NOT NULL DEFAULT '*'")); + migrations.put( + 10, + new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); + migrations.put( + 11, + new Migration( + 11, + "Add sequence table", + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); + migrations.put( + 12, + new Migration( + 12, + "Add flush index to support ordering", + "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); + } + + @Override + public String getName() { + return "BaseDialect"; + } + + @Override + public String getDelete() { + return "DELETE FROM {{table}} WHERE id = ? and version = ?"; + } + + @Override + public String getDeleteExpired() { + return "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false" + + " LIMIT {{batchSize}}"; + } + + @Override + public String getSelectBatch() { + return "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " + + "AND blocked = false AND processed = false AND topic = '*' LIMIT {{batchSize}}"; + } + + @Override + public String getLock() { + return "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR UPDATE"; + } + + @Override + public String getCheckSql() { + return "SELECT 1"; + } + + @Override + public String getFetchNextInAllTopics() { + return "SELECT {{allFields}} FROM {{table}} a" + + " WHERE processed = false AND topic <> '*' AND nextAttemptTime < ?" + + " AND seq = (" + + "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false" + + ") LIMIT {{batchSize}}"; + } + + @Override + public String getFetchCurrentVersion() { + return "SELECT version FROM TXNO_VERSION FOR UPDATE"; + } + + @Override + public String getFetchNextSequence() { + return "SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE"; + } + + @Override + public String booleanValue(boolean criteriaValue) { + return criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + try (Statement s = connection.createStatement()) { + s.execute( + "CREATE TABLE IF NOT EXISTS TXNO_VERSION (id INT DEFAULT 0, version INT, PRIMARY KEY (id))"); + } + } + + @Override + public Stream getMigrations() { + return migrations.values().stream(); + } + + void changeMigration(int version, String sql) { + this.migrations.put(version, this.migrations.get(version).withSql(sql)); + } + + void disableMigration(int version) { + this.migrations.put(version, this.migrations.get(version).withSql(null)); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java deleted file mode 100644 index caa70d87..00000000 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ /dev/null @@ -1,207 +0,0 @@ -package com.gruelbox.transactionoutbox; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; -import java.util.Map; -import java.util.TreeMap; -import java.util.function.Function; -import java.util.stream.Stream; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; -import lombok.experimental.Accessors; - -@AllArgsConstructor(access = AccessLevel.PRIVATE) -class DefaultDialect implements Dialect { - - static Builder builder(String name) { - return new Builder(name); - } - - @Getter private final String name; - @Getter private final String deleteExpired; - @Getter private final String delete; - @Getter private final String selectBatch; - @Getter private final String lock; - @Getter private final String checkSql; - @Getter private final String fetchNextInAllTopics; - @Getter private final String fetchCurrentVersion; - @Getter private final String fetchNextSequence; - private final Collection migrations; - - @Override - public String booleanValue(boolean criteriaValue) { - return criteriaValue ? Boolean.TRUE.toString() : Boolean.FALSE.toString(); - } - - @Override - public void createVersionTableIfNotExists(Connection connection) throws SQLException { - try (Statement s = connection.createStatement()) { - s.execute( - "CREATE TABLE IF NOT EXISTS TXNO_VERSION (id INT DEFAULT 0, version INT, PRIMARY KEY (id))"); - } - } - - @Override - public String toString() { - return name; - } - - @Override - public Stream getMigrations() { - return migrations.stream(); - } - - @Setter - @Accessors(fluent = true) - static final class Builder { - private final String name; - private String delete = "DELETE FROM {{table}} WHERE id = ? and version = ?"; - private String deleteExpired = - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false" - + " LIMIT {{batchSize}}"; - private String selectBatch = - "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " - + "AND blocked = false AND processed = false AND topic = '*' LIMIT {{batchSize}}"; - private String lock = - "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR UPDATE"; - private String checkSql = "SELECT 1"; - private Map migrations; - private Function booleanValueFrom; - private SQLAction createVersionTableBy; - private String fetchNextInAllTopics = - "SELECT {{allFields}} FROM {{table}} a" - + " WHERE processed = false AND topic <> '*' AND nextAttemptTime < ?" - + " AND seq = (" - + "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false" - + ") LIMIT {{batchSize}}"; - private String fetchCurrentVersion = "SELECT version FROM TXNO_VERSION FOR UPDATE"; - private String fetchNextSequence = "SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE"; - - Builder(String name) { - this.name = name; - this.migrations = new TreeMap<>(); - migrations.put( - 1, - new Migration( - 1, - "Create outbox table", - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR(36) PRIMARY KEY,\n" - + " invocation TEXT,\n" - + " nextAttemptTime TIMESTAMP(6),\n" - + " attempts INT,\n" - + " blacklisted BOOLEAN,\n" - + " version INT\n" - + ")")); - migrations.put( - 2, - new Migration( - 2, - "Add unique request id", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN uniqueRequestId VARCHAR(100) NULL UNIQUE")); - migrations.put( - 3, - new Migration( - 3, "Add processed flag", "ALTER TABLE TXNO_OUTBOX ADD COLUMN processed BOOLEAN")); - migrations.put( - 4, - new Migration( - 4, - "Add flush index", - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blacklisted, nextAttemptTime)")); - migrations.put( - 5, - new Migration( - 5, - "Increase size of uniqueRequestId", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN uniqueRequestId VARCHAR(250)")); - migrations.put( - 6, - new Migration( - 6, - "Rename column blacklisted to blocked", - "ALTER TABLE TXNO_OUTBOX CHANGE COLUMN blacklisted blocked VARCHAR(250)")); - migrations.put( - 7, - new Migration( - 7, - "Add lastAttemptTime column to outbox", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation")); - migrations.put( - 8, - new Migration( - 8, - "Update length of invocation column on outbox for MySQL dialects only.", - "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); - migrations.put( - 9, - new Migration( - 9, - "Add topic", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NOT NULL DEFAULT '*'")); - migrations.put( - 10, - new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); - migrations.put( - 11, - new Migration( - 11, - "Add sequence table", - "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); - migrations.put( - 12, - new Migration( - 12, - "Add flush index to support ordering", - "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); - } - - Builder setMigration(Migration migration) { - this.migrations.put(migration.getVersion(), migration); - return this; - } - - Builder changeMigration(int version, String sql) { - return setMigration(this.migrations.get(version).withSql(sql)); - } - - Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { - return setMigration(this.migrations.get(version).withSql(null)); - } - - Dialect build() { - return new DefaultDialect( - name, - deleteExpired, - delete, - selectBatch, - lock, - checkSql, - fetchNextInAllTopics, - fetchCurrentVersion, - fetchNextSequence, - migrations.values()) { - @Override - public String booleanValue(boolean criteriaValue) { - if (booleanValueFrom != null) { - return booleanValueFrom.apply(criteriaValue); - } - return super.booleanValue(criteriaValue); - } - - @Override - public void createVersionTableIfNotExists(Connection connection) throws SQLException { - if (createVersionTableBy != null) { - createVersionTableBy.doAction(connection); - } else { - super.createVersionTableIfNotExists(connection); - } - } - }; - } - } -} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java index 8372ecb3..51868a55 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java @@ -70,7 +70,7 @@ static void writeSchema(Writer writer, Dialect dialect) { printWriter.print(": "); printWriter.println(migration.getName()); if (migration.getSql() == null || migration.getSql().isEmpty()) { - printWriter.println("-- Nothing for " + dialect); + printWriter.println("-- Nothing for " + dialect.getName()); } else { printWriter.println(migration.getSql()); } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 62fc6d9d..34601f94 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -2,11 +2,12 @@ import java.sql.Connection; import java.sql.SQLException; -import java.sql.Statement; import java.util.stream.Stream; /** The SQL dialects supported by {@link DefaultPersistor}. */ public interface Dialect { + String getName(); + String getDelete(); /** @@ -32,173 +33,10 @@ public interface Dialect { Stream getMigrations(); - Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); - Dialect MY_SQL_8 = - DefaultDialect.builder("MY_SQL_8") - .fetchNextInAllTopics( - "WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" - + " FROM {{table}} WHERE processed = false AND topic <> '*')" - + " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}") - .deleteExpired( - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false" - + " LIMIT {{batchSize}}") - .selectBatch( - "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " - + "AND blocked = false AND processed = false AND topic = '*' LIMIT {{batchSize}} FOR UPDATE " - + "SKIP LOCKED") - .lock( - "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " - + "UPDATE SKIP LOCKED") - .build(); - Dialect POSTGRESQL_9 = - DefaultDialect.builder("POSTGRESQL_9") - .fetchNextInAllTopics( - "WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" - + " FROM {{table}} WHERE processed = false AND topic <> '*')" - + " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}") - .deleteExpired( - "DELETE FROM {{table}} WHERE id IN " - + "(SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT {{batchSize}})") - .selectBatch( - "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " - + "AND blocked = false AND processed = false AND topic = '*' LIMIT " - + "{{batchSize}} FOR UPDATE SKIP LOCKED") - .lock( - "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " - + "UPDATE SKIP LOCKED") - .changeMigration( - 5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId TYPE VARCHAR(250)") - .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") - .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)") - .disableMigration(8) - .build(); - - Dialect H2 = - DefaultDialect.builder("H2") - .changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)") - .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") - .disableMigration(8) - .build(); - Dialect ORACLE = - DefaultDialect.builder("ORACLE") - .fetchNextInAllTopics( - "WITH cte1 AS (SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" - + " FROM {{table}} WHERE processed = 0 AND topic <> '*')" - + " SELECT * FROM cte1 WHERE rn = 1 AND nextAttemptTime < ? AND ROWNUM <= {{batchSize}}") - .deleteExpired( - "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 " - + "AND ROWNUM <= {{batchSize}}") - .selectBatch( - "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " - + "AND blocked = 0 AND processed = 0 AND topic = '*' AND ROWNUM <= {{batchSize}} FOR UPDATE " - + "SKIP LOCKED") - .lock( - "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " - + "UPDATE SKIP LOCKED") - .checkSql("SELECT 1 FROM DUAL") - .changeMigration( - 1, - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR2(36) PRIMARY KEY,\n" - + " invocation CLOB,\n" - + " nextAttemptTime TIMESTAMP(6),\n" - + " attempts NUMBER,\n" - + " blacklisted NUMBER(1),\n" - + " version NUMBER\n" - + ")") - .changeMigration( - 2, "ALTER TABLE TXNO_OUTBOX ADD uniqueRequestId VARCHAR(100) NULL UNIQUE") - .changeMigration(3, "ALTER TABLE TXNO_OUTBOX ADD processed NUMBER(1)") - .changeMigration(5, "ALTER TABLE TXNO_OUTBOX MODIFY uniqueRequestId VARCHAR2(250)") - .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") - .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)") - .disableMigration(8) - .changeMigration(9, "ALTER TABLE TXNO_OUTBOX ADD topic VARCHAR(250) DEFAULT '*' NOT NULL") - .changeMigration(10, "ALTER TABLE TXNO_OUTBOX ADD seq NUMBER") - .changeMigration( - 11, - "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") - .booleanValueFrom(v -> v ? "1" : "0") - .createVersionTableBy( - connection -> { - try (Statement s = connection.createStatement()) { - try { - s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)"); - } catch (SQLException e) { - // oracle code for name already used by an existing object - if (!e.getMessage().contains("955")) { - throw e; - } - } - } - }) - .build(); - - Dialect MS_SQL_SERVER = - DefaultDialect.builder("MS_SQL_SERVER") - .lock( - "SELECT id, invocation FROM {{table}} WITH (UPDLOCK, ROWLOCK, READPAST) WHERE id = ? AND version = ?") - .selectBatch( - "SELECT TOP ({{batchSize}}) {{allFields}} FROM {{table}} " - + "WITH (UPDLOCK, ROWLOCK, READPAST) WHERE nextAttemptTime < ? AND topic = '*' " - + "AND blocked = 0 AND processed = 0") - .delete("DELETE FROM {{table}} WITH (ROWLOCK, READPAST) WHERE id = ? and version = ?") - .deleteExpired( - "DELETE TOP ({{batchSize}}) FROM {{table}} " - + "WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0") - .fetchCurrentVersion("SELECT version FROM TXNO_VERSION WITH (UPDLOCK, ROWLOCK, READPAST)") - .fetchNextInAllTopics( - "SELECT TOP {{batchSize}} {{allFields}} FROM {{table}} a" - + " WHERE processed = 0 AND topic <> '*' AND nextAttemptTime < ?" - + " AND seq = (" - + "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = 0" - + ")") - .fetchNextSequence( - "SELECT seq FROM TXNO_SEQUENCE WITH (UPDLOCK, ROWLOCK, READPAST) WHERE topic = ?") - .booleanValueFrom(v -> v ? "1" : "0") - .changeMigration( - 1, - "CREATE TABLE TXNO_OUTBOX (\n" - + " id VARCHAR(36) PRIMARY KEY,\n" - + " invocation NVARCHAR(MAX),\n" - + " nextAttemptTime DATETIME2(6),\n" - + " attempts INT,\n" - + " blocked BIT,\n" - + " version INT,\n" - + " uniqueRequestId VARCHAR(250),\n" - + " processed BIT,\n" - + " lastAttemptTime DATETIME2(6),\n" - + " topic VARCHAR(250) DEFAULT '*' NOT NULL,\n" - + " seq INT\n" - + ")") - .disableMigration(2) - .disableMigration(3) - .changeMigration( - 4, - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime)") - .disableMigration(5) - .disableMigration(6) - .disableMigration(7) - .changeMigration( - 8, - "CREATE UNIQUE INDEX UX_TXNO_OUTBOX_uniqueRequestId ON TXNO_OUTBOX (uniqueRequestId) WHERE uniqueRequestId IS NOT NULL") - .disableMigration(9) - .disableMigration(10) - .changeMigration( - 11, - "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq INT NOT NULL, CONSTRAINT " - + "PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") - .createVersionTableBy( - connection -> { - try (Statement s = connection.createStatement()) { - s.execute( - "IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'TXNO_VERSION')\n" - + "BEGIN\n" - + " CREATE TABLE TXNO_VERSION (\n" - + " version INT\n" - + " );" - + "END"); - } - }) - .build(); + Dialect MY_SQL_5 = new MySQL5Dialect(); + Dialect MY_SQL_8 = new MySQL8Dialect(); + Dialect POSTGRESQL_9 = new Postgresql9Dialect(); + Dialect H2 = new H2Dialect(); + Dialect ORACLE = new OracleDialect(); + Dialect MS_SQL_SERVER = new MsSQLServerDialect(); } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/H2Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/H2Dialect.java new file mode 100644 index 00000000..017f6d94 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/H2Dialect.java @@ -0,0 +1,15 @@ +package com.gruelbox.transactionoutbox; + +public class H2Dialect extends BaseDialect { + public H2Dialect() { + super(); + changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)"); + changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked"); + disableMigration(8); + } + + @Override + public String getName() { + return "H2"; + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MsSQLServerDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MsSQLServerDialect.java new file mode 100644 index 00000000..41241f9c --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MsSQLServerDialect.java @@ -0,0 +1,107 @@ +package com.gruelbox.transactionoutbox; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +public class MsSQLServerDialect extends BaseDialect { + public MsSQLServerDialect() { + super(); + changeMigration( + 1, + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR(36) PRIMARY KEY,\n" + + " invocation NVARCHAR(MAX),\n" + + " nextAttemptTime DATETIME2(6),\n" + + " attempts INT,\n" + + " blocked BIT,\n" + + " version INT,\n" + + " uniqueRequestId VARCHAR(250),\n" + + " processed BIT,\n" + + " lastAttemptTime DATETIME2(6),\n" + + " topic VARCHAR(250) DEFAULT '*' NOT NULL,\n" + + " seq INT\n" + + ")"); + disableMigration(2); + disableMigration(3); + changeMigration( + 4, "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime)"); + disableMigration(5); + disableMigration(6); + disableMigration(7); + changeMigration( + 8, + "CREATE UNIQUE INDEX UX_TXNO_OUTBOX_uniqueRequestId ON TXNO_OUTBOX (uniqueRequestId) WHERE uniqueRequestId IS NOT NULL"); + disableMigration(9); + disableMigration(10); + changeMigration( + 11, + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq INT NOT NULL, CONSTRAINT " + + "PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))"); + } + + @Override + public String getName() { + return "MS_SQL_SERVER"; + } + + @Override + public String getLock() { + return "SELECT id, invocation FROM {{table}} WITH (UPDLOCK, ROWLOCK, READPAST) WHERE id = ? AND version = ?"; + } + + @Override + public String getSelectBatch() { + return "SELECT TOP ({{batchSize}}) {{allFields}} FROM {{table}} " + + "WITH (UPDLOCK, ROWLOCK, READPAST) WHERE nextAttemptTime < ? AND topic = '*' " + + "AND blocked = 0 AND processed = 0"; + } + + @Override + public String getDelete() { + return "DELETE FROM {{table}} WITH (ROWLOCK, READPAST) WHERE id = ? and version = ?"; + } + + @Override + public String getDeleteExpired() { + return "DELETE TOP ({{batchSize}}) FROM {{table}} " + + "WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0"; + } + + @Override + public String getFetchCurrentVersion() { + return "SELECT version FROM TXNO_VERSION WITH (UPDLOCK, ROWLOCK, READPAST)"; + } + + @Override + public String getFetchNextSequence() { + return "SELECT seq FROM TXNO_SEQUENCE WITH (UPDLOCK, ROWLOCK, READPAST) WHERE topic = ?"; + } + + @Override + public String booleanValue(boolean criteriaValue) { + return criteriaValue ? "1" : "0"; + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + try (Statement s = connection.createStatement()) { + s.execute( + "IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'TXNO_VERSION')\n" + + "BEGIN\n" + + " CREATE TABLE TXNO_VERSION (\n" + + " version INT\n" + + " );" + + "END"); + } + } + + @Override + public String getFetchNextInAllTopics() { + return "SELECT TOP {{batchSize}} {{allFields}} FROM {{table}} a" + + " WHERE processed = 0 AND topic <> '*' AND nextAttemptTime < ?" + + " AND seq = (" + + "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = 0" + + ")"; + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL5Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL5Dialect.java new file mode 100644 index 00000000..2605523d --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL5Dialect.java @@ -0,0 +1,8 @@ +package com.gruelbox.transactionoutbox; + +public class MySQL5Dialect extends BaseDialect { + @Override + public String getName() { + return "MY_SQL_5"; + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL8Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL8Dialect.java new file mode 100644 index 00000000..c1c68e77 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/MySQL8Dialect.java @@ -0,0 +1,28 @@ +package com.gruelbox.transactionoutbox; + +public class MySQL8Dialect extends BaseDialect { + @Override + public String getName() { + return "MY_SQL_8"; + } + + @Override + public String getFetchNextInAllTopics() { + return "WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" + + " FROM {{table}} WHERE processed = false AND topic <> '*')" + + " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}"; + } + + @Override + public String getSelectBatch() { + return "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " + + "AND blocked = false AND processed = false AND topic = '*' LIMIT {{batchSize}} FOR UPDATE " + + "SKIP LOCKED"; + } + + @Override + public String getLock() { + return "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " + + "UPDATE SKIP LOCKED"; + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/OracleDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/OracleDialect.java new file mode 100644 index 00000000..475acabe --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/OracleDialect.java @@ -0,0 +1,87 @@ +package com.gruelbox.transactionoutbox; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +public class OracleDialect extends BaseDialect { + public OracleDialect() { + super(); + changeMigration( + 1, + "CREATE TABLE TXNO_OUTBOX (\n" + + " id VARCHAR2(36) PRIMARY KEY,\n" + + " invocation CLOB,\n" + + " nextAttemptTime TIMESTAMP(6),\n" + + " attempts NUMBER,\n" + + " blacklisted NUMBER(1),\n" + + " version NUMBER\n" + + ")"); + changeMigration(2, "ALTER TABLE TXNO_OUTBOX ADD uniqueRequestId VARCHAR(100) NULL UNIQUE"); + changeMigration(3, "ALTER TABLE TXNO_OUTBOX ADD processed NUMBER(1)"); + changeMigration(5, "ALTER TABLE TXNO_OUTBOX MODIFY uniqueRequestId VARCHAR2(250)"); + changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked"); + changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)"); + disableMigration(8); + changeMigration(9, "ALTER TABLE TXNO_OUTBOX ADD topic VARCHAR(250) DEFAULT '*' NOT NULL"); + changeMigration(10, "ALTER TABLE TXNO_OUTBOX ADD seq NUMBER"); + changeMigration( + 11, + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))"); + } + + @Override + public String getName() { + return "ORACLE"; + } + + @Override + public String getFetchNextInAllTopics() { + return "WITH cte1 AS (SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" + + " FROM {{table}} WHERE processed = 0 AND topic <> '*')" + + " SELECT * FROM cte1 WHERE rn = 1 AND nextAttemptTime < ? AND ROWNUM <= {{batchSize}}"; + } + + @Override + public String getLock() { + return "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " + + "UPDATE SKIP LOCKED"; + } + + @Override + public String getCheckSql() { + return "SELECT 1 FROM DUAL"; + } + + @Override + public String getDeleteExpired() { + return "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 " + + "AND ROWNUM <= {{batchSize}}"; + } + + @Override + public String getSelectBatch() { + return "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " + + "AND blocked = 0 AND processed = 0 AND topic = '*' AND ROWNUM <= {{batchSize}} FOR UPDATE " + + "SKIP LOCKED"; + } + + @Override + public void createVersionTableIfNotExists(Connection connection) throws SQLException { + try (Statement s = connection.createStatement()) { + try { + s.execute("CREATE TABLE TXNO_VERSION (version NUMBER)"); + } catch (SQLException e) { + // oracle code for name already used by an existing object + if (!e.getMessage().contains("955")) { + throw e; + } + } + } + } + + @Override + public String booleanValue(boolean criteriaValue) { + return criteriaValue ? "1" : "0"; + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Postgresql9Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Postgresql9Dialect.java new file mode 100644 index 00000000..2b2f5f0f --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Postgresql9Dialect.java @@ -0,0 +1,42 @@ +package com.gruelbox.transactionoutbox; + +public class Postgresql9Dialect extends BaseDialect { + public Postgresql9Dialect() { + super(); + changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId TYPE VARCHAR(250)"); + changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked"); + changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)"); + disableMigration(8); + } + + @Override + public String getName() { + return "POSTGRESQL_9"; + } + + @Override + public String getDeleteExpired() { + return "DELETE FROM {{table}} WHERE id IN " + + "(SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT {{batchSize}})"; + } + + @Override + public String getSelectBatch() { + return "SELECT {{allFields}} FROM {{table}} WHERE nextAttemptTime < ? " + + "AND blocked = false AND processed = false AND topic = '*' LIMIT " + + "{{batchSize}} FOR UPDATE SKIP LOCKED"; + } + + @Override + public String getLock() { + return "SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR " + + "UPDATE SKIP LOCKED"; + } + + @Override + public String getFetchNextInAllTopics() { + return "WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn" + + " FROM {{table}} WHERE processed = false AND topic <> '*')" + + " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}"; + } +} diff --git a/transactionoutbox-spring/src/test/java/com/gruelbox/transactionoutbox/spring/example/TransactionOutboxSpringDemoApplication.java b/transactionoutbox-spring/src/test/java/com/gruelbox/transactionoutbox/spring/example/TransactionOutboxSpringDemoApplication.java index 3f2eaa49..4e20b415 100644 --- a/transactionoutbox-spring/src/test/java/com/gruelbox/transactionoutbox/spring/example/TransactionOutboxSpringDemoApplication.java +++ b/transactionoutbox-spring/src/test/java/com/gruelbox/transactionoutbox/spring/example/TransactionOutboxSpringDemoApplication.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.gruelbox.transactionoutbox.DefaultPersistor; import com.gruelbox.transactionoutbox.Dialect; +import com.gruelbox.transactionoutbox.H2Dialect; import com.gruelbox.transactionoutbox.Persistor; import com.gruelbox.transactionoutbox.TransactionOutbox; import com.gruelbox.transactionoutbox.jackson.JacksonInvocationSerializer; @@ -22,16 +23,22 @@ public static void main(String[] args) { SpringApplication.run(TransactionOutboxSpringDemoApplication.class, args); } + @Bean + Dialect dialect() { + return new H2Dialect(); + } + @Bean @Lazy - Persistor persistor(TransactionOutboxProperties properties, ObjectMapper objectMapper) { + Persistor persistor( + TransactionOutboxProperties properties, ObjectMapper objectMapper, Dialect dialect) { if (properties.isUseJackson()) { return DefaultPersistor.builder() .serializer(JacksonInvocationSerializer.builder().mapper(objectMapper).build()) - .dialect(Dialect.H2) + .dialect(dialect) .build(); } else { - return Persistor.forDialect(Dialect.H2); + return Persistor.forDialect(dialect); } }