Skip to content

Commit e5ebe0a

Browse files
committed
Timeout all acknowledgements for now
1 parent d28c16d commit e5ebe0a

File tree

5 files changed

+29
-8
lines changed

5 files changed

+29
-8
lines changed

src/main/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperClusterState.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.io.stream.StreamInput;
3333
import org.elasticsearch.common.io.stream.StreamOutput;
3434
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.discovery.Discovery;
3536
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3637

3738
import java.io.IOException;
@@ -83,7 +84,13 @@ public ZooKeeperClusterState(Settings settings, ZooKeeperEnvironment environment
8384
*
8485
* @throws InterruptedException
8586
*/
86-
public void publish(ClusterState state) throws ElasticSearchException, InterruptedException {
87+
public void publish(ClusterState state, Discovery.AckListener ackListener) throws ElasticSearchException, InterruptedException {
88+
// TODO: Add ack logic
89+
publish(state/*, new AckClusterStatePublishResponseHandler(state.nodes().size() - 1, ackListener)*/ );
90+
ackListener.onTimeout();
91+
}
92+
93+
private void publish(ClusterState state/*, final ClusterStatePublishResponseHandler publishResponseHandler*/) throws ElasticSearchException, InterruptedException {
8794
publishingLock.lock();
8895
try {
8996
logger.trace("Publishing new cluster state version [{}]", state.version());

src/main/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -667,9 +667,9 @@ public ZooKeeperStatePublisher(Settings settings, ZooKeeperEnvironment environme
667667
@Override public void publish(ClusterState clusterState, AckListener ackListener) {
668668
try {
669669
// ignore the ack. rely on zk to handle distribution.
670-
zooKeeperClusterState.publish(clusterState);
670+
zooKeeperClusterState.publish(clusterState, ackListener);
671671
} catch (InterruptedException ex) {
672-
// Ignore
672+
Thread.currentThread().interrupt();
673673
}
674674

675675
}

src/test/java/com/sonian/elasticsearch/zookeeper/discovery/AbstractZooKeeperTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import com.sonian.elasticsearch.zookeeper.discovery.embedded.EmbeddedZooKeeperService;
2424
import org.elasticsearch.cluster.ClusterName;
2525
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.node.DiscoveryNodes;
2728
import org.elasticsearch.cluster.routing.*;
29+
import org.elasticsearch.common.Nullable;
2830
import org.elasticsearch.common.logging.ESLogger;
2931
import org.elasticsearch.common.logging.Loggers;
3032
import org.elasticsearch.common.network.NetworkUtils;
3133
import org.elasticsearch.common.settings.ImmutableSettings;
3234
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.discovery.Discovery;
3336
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3437
import org.elasticsearch.env.Environment;
3538
import org.elasticsearch.index.shard.ShardId;
@@ -251,4 +254,15 @@ protected String clusterStateVersion() {
251254
}
252255
}
253256

257+
258+
protected static class NoOpAckListener implements Discovery.AckListener {
259+
@Override
260+
public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
261+
}
262+
263+
@Override
264+
public void onTimeout() {
265+
}
266+
}
267+
254268
}

src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperClusterStateTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void testClusterStatePublishing() throws Exception {
6060

6161
zkState.start();
6262

63-
zkState.publish(initialState);
63+
zkState.publish(initialState, new NoOpAckListener());
6464

6565
final CountDownLatch latch = new CountDownLatch(1);
6666

@@ -79,7 +79,7 @@ public void onNewClusterState(ClusterState clusterState) {
7979
.version(1235L)
8080
.build();
8181

82-
zkState.publish(secondVersion);
82+
zkState.publish(secondVersion, new NoOpAckListener());
8383

8484
retrievedState = zkState.retrieve(null);
8585

@@ -101,7 +101,7 @@ public void testClusterStatePublishingWithNewVersion() throws Exception {
101101

102102
zkStateOld.start();
103103

104-
zkStateOld.publish(initialState);
104+
zkStateOld.publish(initialState, new NoOpAckListener());
105105

106106
zkStateOld.stop();
107107

@@ -124,7 +124,7 @@ public void testClusterStatePublishingWithNewVersion() throws Exception {
124124
zkStateNew.syncClusterState();
125125

126126
// Make sure that new start can be published now
127-
zkStateNew.publish(initialState);
127+
zkStateNew.publish(initialState, new NoOpAckListener());
128128

129129
zkStateNew.stop();
130130

src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscoveryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public void shutdownZooKeeper() {
236236
ClusterState initialState = testClusterState(routingTable, nodes);
237237
ZooKeeperClusterState zkStateOld = buildZooKeeperClusterState(nodes, "0.0.1");
238238
zkStateOld.start();
239-
zkStateOld.publish(initialState);
239+
zkStateOld.publish(initialState, new NoOpAckListener());
240240
zkStateOld.stop();
241241

242242
// Create a client node

0 commit comments

Comments
 (0)