diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java index 91946d7c..d6696dbf 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java @@ -21,6 +21,10 @@ import java.lang.management.ManagementFactory; import java.net.URI; import java.net.URISyntaxException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -297,11 +301,60 @@ public synchronized void start(long sinceSCN) } } } + + // If we get to here and the SCN is still 0 or -1 + // go get the current position from the DB + if(sinceSCNToUse <= 0) { + sinceSCNToUse = getCurrentMaxSCN(); + } _producerThread = new EventProducerThread(_physicalSourceName, sinceSCNToUse); _producerThread.start(); } - public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor + /** + * Gets the current Max SCN from the DB. Used when there isn't one specified + * @return + */ + private long getCurrentMaxSCN() { + Connection connection = null; + try { + Class.forName("com.mysql.jdbc.Driver").newInstance(); + connection = DriverManager.getConnection("jdbc:mysql://"+ _or.getHost() + ":" + _or.getPort() + "/?" + + "user="+ _or.getUser() + "&password=" + _or.getPassword()); + ResultSet rs = connection.prepareStatement("show master status").executeQuery(); + String filename = null; + int position = 4; + if(rs.next()) + { + filename = rs.getString(1); + position = rs.getInt(2); + } + rs.close(); + int fileId = Integer.parseInt(filename.split("\\.")[1]); + return ORListener.scn(fileId, position); + + } + catch (Exception e) + { + _log.error("Unable to load mysql driver to get SCN:" + e.getMessage(), e); + return 0; + } + finally + { + if (connection != null) + { + try { + connection.close(); + } + catch (SQLException e) + { + _log.error("Unable to close mysql connection" + e.getMessage(), e); + } + } + } +} + +public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor { // The scn with which the event buffer is started private final AtomicLong _startPrevScn = new AtomicLong(-1);