diff --git a/pom.xml b/pom.xml
index 7aa0121..5a0f447 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.cdap.plugin
cdc-plugins
jar
- 2.0.1-SNAPSHOT
+ 2.0.6-SNAPSHOT
cdc-plugins
@@ -91,7 +91,7 @@
4.1.16.Final
4.11
- system:cdap-data-streams[6.0.0,7.0.0)
+ system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0)
${project.basedir}
true
diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java
index 1947c23..842520f 100644
--- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java
+++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java
@@ -58,14 +58,22 @@ public class CTInputDStream extends InputDStream {
private long trackingOffset;
private long failureStartTime;
private boolean isFailing;
+ private boolean forceFailure;
+ private final Set operationsList;
+ private int retentionDays;
+ private String retentionColumn;
- CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory,
- Set tableWhitelist, long startingOffset, long maxRetrySeconds, int maxBatchSize) {
+ CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory, Set operationsList,
+ int retentionDays, String retentionColumn, Set tableWhitelist, long startingOffset,
+ long maxRetrySeconds, int maxBatchSize) {
super(ssc, ClassTag$.MODULE$.apply(StructuredRecord.class));
this.connectionFactory = connectionFactory;
this.maxRetrySeconds = maxRetrySeconds;
this.maxBatchSize = maxBatchSize;
this.tableWhitelist = Collections.unmodifiableSet(new HashSet<>(tableWhitelist));
+ this.operationsList = Collections.unmodifiableSet(new HashSet<>(operationsList));
+ this.retentionDays = retentionDays;
+ this.retentionColumn = retentionColumn;
// if not current tracking version is given initialize it to 0
trackingOffset = startingOffset;
}
@@ -90,7 +98,7 @@ public Option> compute(Time validTime) {
private boolean shouldFail() {
long timeElapsed = nowInSeconds() - failureStartTime;
- return maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds);
+ return forceFailure || maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds);
}
private static long nowInSeconds() {
@@ -113,7 +121,16 @@ private Option> doCompute(Time validTime) throws Exception
// retrieve the current highest tracking version
long prev = trackingOffset;
long cur = Math.min(getCurrentTrackingVersion(connection), prev + maxBatchSize);
- LOG.info("Fetching changes from {} to {}", prev, cur);
+ if (prev != cur) {
+ LOG.info("Fetching changes from {} to {}", prev, cur);
+ }
+
+ if (cur < prev) {
+ this.forceFailure = true;
+ throw Throwables.propagate(new Exception("Current CT version is less than the previous",
+ new Error(String.format("Previous CT version: %s Current CT version: %s", prev, cur))));
+ }
+
// get all the data changes (DML) for the ct enabled tables till current tracking version
for (TableInformation tableInformation : tableInformations) {
changeRDDs.add(getChangeData(tableInformation, prev, cur));
@@ -146,16 +163,28 @@ public void stop() {
}
private JavaRDD getChangeData(TableInformation tableInformation, long prev, long cur) {
+ String retentionFilter = "";
+ if (!this.operationsList.contains("D") && this.retentionDays > 0 && this.retentionColumn != null) {
+ retentionFilter = String.format("OR [CT].[SYS_CHANGE_OPERATION] = 'D' " +
+ "AND [CT].%s > DATEADD(day, -%s, GETDATE()) ",
+ this.retentionColumn, this.retentionDays);
+ }
+
String stmt = String.format("SELECT [CT].[SYS_CHANGE_VERSION] as CHANGE_TRACKING_VERSION, " +
"[CT].[SYS_CHANGE_CREATION_VERSION], " +
"[CT].[SYS_CHANGE_OPERATION], " +
- "CURRENT_TIMESTAMP as CDC_CURRENT_TIMESTAMP, " +
+ "SYSUTCDATETIME() as CDC_CURRENT_TIMESTAMP, " +
"%s, %s FROM [%s] (nolock) " +
"as [CI] RIGHT OUTER JOIN " +
- "CHANGETABLE (CHANGES [%s], %s) as [CT] on %s where [CT]" +
- ".[SYS_CHANGE_VERSION] > ? " +
- "and [CT].[SYS_CHANGE_VERSION] <= ? ORDER BY [CT]" +
- ".[SYS_CHANGE_VERSION]",
+ "CHANGETABLE (CHANGES [%s], %s) as [CT] on %s " +
+ "where [CT].[SYS_CHANGE_VERSION] > ? " +
+ "and [CT].[SYS_CHANGE_VERSION] <= ? " +
+ "and (" +
+ "[CT].[SYS_CHANGE_OPERATION] IN " +
+ "('" + String.join("','", this.operationsList) + "') " +
+ retentionFilter +
+ ") " +
+ "ORDER BY [CT].[SYS_CHANGE_VERSION]",
getSelectColumns("CT", tableInformation.getPrimaryKeys()),
getSelectColumns("CI", tableInformation.getValueColumnNames()),
tableInformation.getName(), tableInformation.getName(), prev,
@@ -163,7 +192,8 @@ private JavaRDD getChangeData(TableInformation tableInformatio
LOG.debug("Querying for change data with statement {}", stmt);
- //TODO Currently we are not partitioning the data. We should partition it for scalability
+ //TODO Currently we are not partitioning the data. We should p
+ //53™1artition it for scalability
return JdbcRDD.create(getJavaSparkContext(), connectionFactory, stmt, prev, cur, 1,
new ResultSetToDMLRecord(tableInformation));
}
diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java
index e71b7e8..47eedcb 100644
--- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java
+++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java
@@ -137,8 +137,8 @@ public JavaDStream getStream(StreamingContext context) throws
LOG.info("Creating change information dstream");
ClassTag tag = ClassTag$.MODULE$.apply(StructuredRecord.class);
CTInputDStream dstream = new CTInputDStream(context.getSparkStreamingContext().ssc(), connectionFactory,
- conf.getTableWhitelist(), conf.getSequenceStartNum(),
- conf.getMaxRetrySeconds(), conf.getMaxBatchSize());
+ conf.getOperationsList(), conf.getRetentionDays(), conf.getRetentionColumn(), conf.getTableWhitelist(),
+ conf.getSequenceStartNum(), conf.getMaxRetrySeconds(), conf.getMaxBatchSize());
return JavaDStream.fromDStream(dstream, tag)
.mapToPair(structuredRecord -> new Tuple2<>("", structuredRecord))
// map the dstream with schema state store to detect changes in schema
diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java
index 4f72bfa..99c6869 100644
--- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java
+++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java
@@ -44,6 +44,9 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig {
public static final String TABLE_WHITELIST = "tableWhitelist";
public static final String JDBC_PLUGIN_NAME = "jdbcPluginName";
public static final String CONNECTION_STRING = "connectionString";
+ public static final String OPERATIONS_LIST = "operationsList";
+ public static final String RETENTION_DAYS = "retentionDays";
+ public static final String RETENTION_COLUMN = "retentionColumn";
@Name(HOST_NAME)
@Description("SQL Server hostname. This is not required if a connection string was specified.")
@@ -104,6 +107,22 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig {
@Nullable
private final String connectionString;
+ @Name(OPERATIONS_LIST)
+ @Description("The list of operations to read (INSERT,UPDATE,DELETE)")
+ @Nullable
+ private final String operationsList;
+
+ @Name(RETENTION_DAYS)
+ @Description("Retention period for a table if DELETE operation is turned off. " +
+ "In this period all deletes will be imported. Set to 0 to ignore all deletes")
+ @Nullable
+ private final int retentionDays;
+
+ @Name(RETENTION_COLUMN)
+ @Description("Timestamp column to use for retention period (should be a part of Primary Key)")
+ @Nullable
+ private final String retentionColumn;
+
public CTSQLServerConfig() {
super("");
this.hostname = null;
@@ -117,6 +136,9 @@ public CTSQLServerConfig() {
this.tableWhitelist = null;
this.jdbcPluginName = null;
this.connectionString = null;
+ this.operationsList = null;
+ this.retentionDays = 0;
+ this.retentionColumn = null;
}
public String getHostname() {
@@ -170,6 +192,21 @@ public String getConnectionString() {
return String.format("jdbc:sqlserver://%s:%s;DatabaseName=%s", hostname, port, dbName);
}
+ public Set getOperationsList() {
+ return operationsList == null ? Collections.emptySet() :
+ Arrays.stream(operationsList.split(",")).map(String::trim).collect(Collectors.toSet());
+ }
+
+
+ public int getRetentionDays() {
+ return retentionDays;
+ }
+
+ @Nullable
+ public String getRetentionColumn() {
+ return retentionColumn;
+ }
+
@Override
public void validate() {
super.validate();
@@ -194,5 +231,14 @@ public void validate() {
if (port != null && (port < 0 || port > 65535)) {
throw new InvalidConfigPropertyException("Port number should be in range 0-65535", PORT);
}
+
+ if (operationsList == null || operationsList.length() == 0) {
+ throw new InvalidConfigPropertyException("Operations list couldn't be empty", OPERATIONS_LIST);
+ }
+
+ if (retentionDays > 0 && retentionColumn == null) {
+ throw new InvalidConfigPropertyException("Please set the retention timestamp column", RETENTION_COLUMN);
+ }
+
}
}
diff --git a/widgets/CTSQLServer-streamingsource.json b/widgets/CTSQLServer-streamingsource.json
index 3ff4c9b..f26c8f4 100644
--- a/widgets/CTSQLServer-streamingsource.json
+++ b/widgets/CTSQLServer-streamingsource.json
@@ -76,6 +76,45 @@
{
"label": "Advanced",
"properties": [
+ {
+ "name": "operationsList",
+ "label": "Statements list",
+ "widget-type": "multi-select",
+ "widget-attributes": {
+ "delimiter": ",",
+ "defaultValue": [
+ "I",
+ "U"
+ ],
+ "options": [
+ {
+ "id": "I",
+ "label": "INSERT"
+ },
+ {
+ "id": "U",
+ "label": "UPDATE"
+ },
+ {
+ "id": "D",
+ "label": "DELETE"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Retention period (days)",
+ "name": "retentionDays",
+ "widget-attributes": {
+ "default": "0"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Retention timestamp column",
+ "name": "retentionColumn"
+ },
{
"widget-type": "textbox",
"label": "Max Retry Seconds",