Skip to content

Commit bbec36f

Browse files
committed
index @timesstamp field added
1 parent 47e73fc commit bbec36f

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

lib/sink/ElasticSinkConnector.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class ElasticSinkConnector extends SinkConnector {
2323
index: this.properties.restSink.index,
2424
type: this.properties.restSink.type,
2525
mode: this.properties.restSink.mode,
26+
indexTimestampFieldName: this.properties.restSink.indexTimestampFieldName,
2627
};
2728

2829
callback(null, taskConfig);

lib/sink/ElasticSinkTask.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ class ElasticSinkTask extends SinkTask {
1515
index,
1616
type,
1717
batchSize,
18+
indexTimestampFieldName,
1819
} = this.properties;
1920

2021
this.esClient = esClient;
2122
this.idProperty = idProperty;
2223
this.index = index;
2324
this.type = type;
2425
this.batchSize = batchSize;
26+
this.indexTimestampFieldName = indexTimestampFieldName;
2527

2628
this.upserts = new Map();
2729
this.deletes = new Set();
@@ -57,6 +59,9 @@ class ElasticSinkTask extends SinkTask {
5759
let id = record[this.idProperty];
5860
body.push({ "update": {"_id": id, "_index": this.index}});
5961
delete record[this.idProperty];
62+
if (this.indexTimestampFieldName && record.doc) {
63+
record.doc[this.indexTimestampFieldName] = (new Date()).getTime();
64+
}
6065
body.push(record);
6166
}
6267
}

0 commit comments

Comments
 (0)