diff --git a/project/Build.scala b/project/Build.scala index 7aa5c47..55cc579 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -8,8 +8,8 @@ object AsyncZkClient extends Build { val VERSION = "0.2.3" val dependencies = - "com.typesafe.akka" % "akka-actor" % "2.0.4" :: "org.apache.zookeeper" % "zookeeper" % "3.4.3" :: + "com.typesafe.akka" % "akka-actor" % "2.0.4" % "test" :: "org.scalatest" %% "scalatest" % "1.8" % "test" :: "com.github.bigtoast" %% "rokprox" % "0.2.0" % "test" :: Nil diff --git a/src/main/scala/AsyncZooKeeperClient.scala b/src/main/scala/AsyncZooKeeperClient.scala index c1e43b4..02704bf 100644 --- a/src/main/scala/AsyncZooKeeperClient.scala +++ b/src/main/scala/AsyncZooKeeperClient.scala @@ -9,10 +9,11 @@ import org.apache.zookeeper.data.Stat import org.apache.zookeeper.ZooDefs.Ids import scala.collection.JavaConversions._ import org.apache.zookeeper.AsyncCallback._ -import akka.dispatch.{Await, Future, ExecutionContext, Promise} +import scala.concurrent.{Await, Future, ExecutionContext, Promise} import org.apache.zookeeper.KeeperException.Code import java.util -import akka.util.duration._ +import scala.concurrent.duration._ +import scala.util.{Success, Failure} sealed trait AsyncResponse { @@ -113,7 +114,6 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con val assignLatch = new CountDownLatch(1) if (zk != null) { zk.close } - zk = new ZooKeeper(servers, sessionTimeout, new Watcher { def process(event: WatchedEvent) = { assignLatch.await @@ -131,7 +131,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con isAliveSync Await.result( createPath(""), 10 seconds ) } catch { - case e => { + case e :Exception => { log.error("Could not connect to zookeeper ensemble: " + servers + ". Connection timed out after " + connectTimeout + " milliseconds!", e) @@ -170,11 +170,11 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con /** helper method to convert a zk response in to a client reponse and handle the errors */ def handleResponse[T]( rc :Int, path :String, p :Promise[T], stat:Stat, cxt :Option[Any] )( f : => T ) :Future[T] = { - Code.get(rc) match { + (Code.get(rc) match { case Code.OK => p.success( f ) case error if path == null => p.failure( FailedAsyncResponse( KeeperException.create(error),Option(path), Option(stat), cxt ) ) case error => p.failure( FailedAsyncResponse( KeeperException.create(error, path ),Option(path), Option(stat), cxt ) ) - } + }).future } /** Wrapper around the ZK exists method. Watch is hardcoded to false. @@ -189,7 +189,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, stat, ctx ){ StatResponse(path, stat, ctx ) } } }, ctx ) - p + p.future } /** Wrapper around the ZK getChildren method. Watch is hardcoded to false. @@ -204,7 +204,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, stat, ctx ){ ChildrenResponse( children.toSeq, path, stat, ctx ) } } }, ctx) - p + p.future } /** close the underlying zk connection */ @@ -218,7 +218,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con zk.exists("/", false) true } catch { - case e => + case e :Exception => log.warn("ZK not connected in isAliveSync", e) false } @@ -250,7 +250,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, null, ctx ){ StringResponse( name, path, ctx ) } } }, ctx ) - p + p.future } /** Create a node and then return it. Under the hood this is a create followed by a get. If the stat or data is not @@ -287,7 +287,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, stat, ctx ){ DataResponse(Option(data), path, stat, ctx ) } } }, ctx ) - p + p.future } /** Wrapper around the zk setData method. @@ -302,7 +302,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, stat, ctx ){ StatResponse(path, stat, ctx ) } } }, ctx ) - p + p.future } /** Wrapper around zk delete method. @@ -325,7 +325,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con handleResponse(rc, path, p, null, ctx ){ VoidResponse(path, ctx) } } }, ctx ) - p + p.future } } @@ -340,7 +340,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con } } flatMap { seq => if ( p == path ) - Promise.successful( VoidResponse( path, ctx ) ) + Promise.successful( VoidResponse( path, ctx ) ).future else delete( p, -1, ctx ) } } @@ -376,19 +376,19 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con case e if e == EventType.NodeCreated || e == EventType.NodeDataChanged => get(path, watch = ifPersist ).onComplete { - case Left( error ) => + case Failure( error ) => log.error("Error on NodeCreated callback for path %s".format(mkPath(path)), error) - case Right( data ) => + case Success( data :AsyncResponse.DataResponse ) => onData(mkPath(path), Some(data)) } case EventType.NodeDeleted => get(path, watch = ifPersist) onComplete { - case Left( error :FailedAsyncResponse ) if error.code == Code.NONODE => + case Failure( error :FailedAsyncResponse ) if error.code == Code.NONODE => onData(mkPath(path), None) - case Right( data ) => + case Success( data :AsyncResponse.DataResponse ) => onData(mkPath(path), Some(data) ) - case Left( error ) => + case Failure( error ) => log.error("Error on NodeCreated callback for path %s".format(mkPath(path)), error) } } @@ -417,9 +417,9 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con def process(event: WatchedEvent) = event.getType match { case EventType.NodeChildrenChanged => getChildren(p, watch = ifPersist ) onComplete { - case Left(error) => + case Failure(error) => log.error("Error on NodeChildrenChanged callback for path %s".format(p), error) - case Right( kids ) => + case Success( kids :AsyncResponse.ChildrenResponse ) => onKids(kids) } exists(p, watch = ifPersist ) @@ -446,4 +446,4 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con connect -} \ No newline at end of file +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..258734e --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,29 @@ + +akka { + //loglevel = DEBUG + debug { + receive = on + + autoreceive = on + + lifecycle = on + + fsm = on + + event-stream = on + } + + actor { + debug { + receive = on + + autoreceive = on + + lifecycle = on + + fsm = on + + event-stream = on + } + } +} diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..6607153 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,13 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=ERR, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.zookeeper=WARN +#log4j.logger.org.apache.zookeeper.server=DEBUG +#log4j.logger.org.apache.zookeeper.ClientCnxn=DEBUG diff --git a/src/test/scala/AsyncZooKeeperClientSpecs.scala b/src/test/scala/AsyncZooKeeperClientSpecs.scala index a52db37..98d400e 100644 --- a/src/test/scala/AsyncZooKeeperClientSpecs.scala +++ b/src/test/scala/AsyncZooKeeperClientSpecs.scala @@ -4,10 +4,10 @@ package com.github.bigtoast.zookeeper import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, WordSpec} import org.scalatest.matchers.ShouldMatchers import java.util.concurrent.{TimeUnit, CountDownLatch, Executors} -import akka.dispatch.{Await, ExecutionContext, Future} -import akka.util.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ import org.apache.zookeeper.{WatchedEvent, Watcher, CreateMode} -import akka.util.Duration +import scala.concurrent.duration.Duration import compat.Platform import com.github.bigtoast.zookeeper.AsyncResponse.FailedAsyncResponse import org.apache.zookeeper.KeeperException.{NoNodeException, NotEmptyException, BadVersionException} @@ -17,8 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll { + import scala.concurrent.ExecutionContext.Implicits.global + val eService = Executors.newCachedThreadPool implicit val to = 3 second + var zkServer :EmbeddedZookeeper = new EmbeddedZookeeper("127.0.0.1:2181") var zk :AsyncZooKeeperClient = _ class DoAwait[T]( f :Future[T] ) { @@ -28,7 +31,7 @@ class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with Before implicit def toDoAwait[T]( f :Future[T] ) = new DoAwait[T]( f ) before { - zk = new AsyncZooKeeperClient("localhost:2181",1000,1000,"/async-client/tests", None, ExecutionContext.fromExecutorService( eService ) ) + zk = new AsyncZooKeeperClient("127.0.0.1:2181",1000,1000,"/async-client/tests", None, ExecutionContext.fromExecutorService( eService ) ) } after { @@ -40,7 +43,9 @@ class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with Before } override def afterAll { - eService.shutdown + eService.shutdown() + + zkServer.shutdown() } "A relative path should have base path prepended" in { diff --git a/src/test/scala/EmbeddedZookeeper.scala b/src/test/scala/EmbeddedZookeeper.scala new file mode 100644 index 0000000..e0121dd --- /dev/null +++ b/src/test/scala/EmbeddedZookeeper.scala @@ -0,0 +1,64 @@ +package com.github.bigtoast.zookeeper + +import java.io.File +import java.net.InetSocketAddress +import java.util.Random +import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer} + +class EmbeddedZookeeper(val connectString: String) { + import EmbeddedZookeeper._ + + val snapshotDir = tempDir() + val logDir = tempDir() + val tickTime = 500 + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) + val port = connectString.split(":")(1).toInt + val factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 100) + factory.startup(zookeeper) + + def shutdown(): Unit = { + factory.shutdown() + rm(logDir) + rm(snapshotDir) + } + + +} + +object EmbeddedZookeeper { + + val random = new Random() + + /** + * Delete file, in case of directory do that recursively + * @param file file or directory + */ + def rm(file: File): Unit = { + rm(Seq(file)) + } + + /** + * Delete sequence of files or directories, in case of directory do that recursively + * @param file seq of files or directories + */ + def rm(file: Seq[File]): Unit = { + file.headOption match { + case Some(f) if f.isDirectory => + rm(f.listFiles().toSeq ++ file.drop(1)) + f.delete + case Some(f) => + f.delete + rm(file.drop(1)) + case None => + } + } + + + def tempDir(): File = { + val ioDir = System.getProperty("java.io.tmpdir") + val f = new File(ioDir, "kafka-" + random.nextInt(1000000)) + f.mkdirs() + f.deleteOnExit() + f + } +} diff --git a/src/test/scala/FaultToleranceSpecs.scala b/src/test/scala/FaultToleranceSpecs.scala index c2aaa84..b18cf9b 100644 --- a/src/test/scala/FaultToleranceSpecs.scala +++ b/src/test/scala/FaultToleranceSpecs.scala @@ -4,25 +4,28 @@ package com.github.bigtoast.zookeeper import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, WordSpec} import org.scalatest.matchers.ShouldMatchers import java.util.concurrent.{TimeUnit, CountDownLatch, Executors} -import akka.dispatch.{Await, ExecutionContext, Future} -import akka.util.duration._ -import org.apache.zookeeper.{WatchedEvent, Watcher, CreateMode} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher, CreateMode} import Watcher.Event.KeeperState._ -import akka.util.Duration import compat.Platform import com.github.bigtoast.zookeeper.AsyncResponse.FailedAsyncResponse -import org.apache.zookeeper.KeeperException.{NoNodeException, NotEmptyException, BadVersionException} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NotEmptyException, BadVersionException} import AsyncZooKeeperClient._ import org.apache.zookeeper.Watcher.Event.EventType import java.util.concurrent.atomic.AtomicInteger import com.github.bigtoast.rokprox._ import akka.actor.ActorSystem +import scala.util.{Failure, Success} class FaultToleranceSpecs extends WordSpec with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll { - val eService = Executors.newCachedThreadPool + import scala.concurrent.ExecutionContext.Implicits.global + + implicit val to = 3 second + var zkServer :EmbeddedZookeeper = _ var zk :AsyncZooKeeperClient = _ var prox :RokProxy = _ @@ -34,14 +37,12 @@ class FaultToleranceSpecs extends WordSpec with ShouldMatchers with BeforeAndAft implicit def toDoAwait[T]( f :Future[T] ) = new DoAwait[T]( f ) - override def beforeAll { sys = ActorSystem("blabs") } + override def beforeAll { sys = ActorSystem("blabs") } before { - prox = RokProx.proxy("zk").from("127.0.0.1:2345").to("127.0.0.1:2181").build(sys) - - Thread.sleep(2000) - - zk = new AsyncZooKeeperClient("127.0.0.1:2345",1000,1000,"/async-client/tests", None, ExecutionContext.fromExecutorService( eService ) ) + zkServer = new EmbeddedZookeeper("127.0.0.1:22181") + prox = RokProx.proxy("zk").from("127.0.0.1:2345").to("127.0.0.1:22181").build(sys) + Thread.sleep(1000) } @@ -52,26 +53,31 @@ class FaultToleranceSpecs extends WordSpec with ShouldMatchers with BeforeAndAft case _ => zk.close } await + zk.close + prox.shutdown + zkServer.shutdown() Thread.sleep(1000) + } override def afterAll { - eService.shutdown sys.shutdown } "Breaking a connection" should { "trigger a reconnect" in { - val latch = new CountDownLatch(1) + zk = new AsyncZooKeeperClient("127.0.0.1:2345",2000,10000,"/async-client/tests", None, ExecutionContext.global ) + + val latch = new CountDownLatch(1) zk.watchConnection { - case Disconnected => - println("\n\n Got Disconnected \n\n") + case Disconnected => + //println("\n\n Got Disconnected \n\n") latch.countDown - case e => - println("\n\n Got %s \n\n" format e) + case e => + //println("\n\n Got %s \n\n" format e) } val init = zk.createAndGet("testers", Some("besters" getBytes), CreateMode.EPHEMERAL).await @@ -82,40 +88,43 @@ class FaultToleranceSpecs extends WordSpec with ShouldMatchers with BeforeAndAft val cxn = cxns.head - println("Breaking the connection") + //println("Breaking the connection") prox.breakCxn(cxn.id) - latch.await(2L, TimeUnit.SECONDS) should be (true) + latch.await(3L, TimeUnit.SECONDS) should be (true) val second = zk.get("testers").await.data.get.deser[String] should be ("besters") } - "trigger a session timeout" in { - val expiredLatch = new CountDownLatch(1) - - zk.watchConnection { - case Expired => - println("\n\n got expired \n\n") - expiredLatch.countDown - case e => - println("\n\n Got %s \n\n" format e) - } - - val init = zk.createAndGet("testers", Some("besters" getBytes), CreateMode.EPHEMERAL).await - - println("Interrupting the connection for 10 seconds") - - prox.pause() - - expiredLatch.await(10, TimeUnit.SECONDS) should be (true) - - prox.restore - - intercept[AsyncResponse.FailedAsyncResponse] { - zk.get("testers").await - } - } - } - + // This test will not work until https://github.com/bigtoast/rokprox/issues/2 will be resolved +// "trigger a session timeout" in { +// +// zk = new AsyncZooKeeperClient("127.0.0.1:2345",2000,1000,"/async-client/tests", None, ExecutionContext.global ) +// +// val expiredLatch = new CountDownLatch(1) +// +// zk.watchConnection { +// case Expired => +// println("\n\n got expired \n\n") +// expiredLatch.countDown +// case e => +// println("\n\n Got %s \n\n" format e) +// } +// +// val init = zk.createAndGet("testers", Some("besters" getBytes), CreateMode.EPHEMERAL).await +// +// println("Interrupting the connection for 10 seconds") +// +// prox.pause(5 seconds) +// +// expiredLatch.await(20, TimeUnit.SECONDS) should be (true) +// +// prox.restore +// +// intercept[AsyncResponse.FailedAsyncResponse] { +// zk.get("testers").await +// } +// } + } }