Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ object AsyncZkClient extends Build {
val VERSION = "0.2.3"

val dependencies =
"com.typesafe.akka" % "akka-actor" % "2.0.4" ::
"org.apache.zookeeper" % "zookeeper" % "3.4.3" ::
"com.typesafe.akka" % "akka-actor" % "2.0.4" % "test" ::
"org.scalatest" %% "scalatest" % "1.8" % "test" ::
"com.github.bigtoast" %% "rokprox" % "0.2.0" % "test" :: Nil

Expand Down
44 changes: 22 additions & 22 deletions src/main/scala/AsyncZooKeeperClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.ZooDefs.Ids
import scala.collection.JavaConversions._
import org.apache.zookeeper.AsyncCallback._
import akka.dispatch.{Await, Future, ExecutionContext, Promise}
import scala.concurrent.{Await, Future, ExecutionContext, Promise}
import org.apache.zookeeper.KeeperException.Code
import java.util
import akka.util.duration._
import scala.concurrent.duration._
import scala.util.{Success, Failure}


sealed trait AsyncResponse {
Expand Down Expand Up @@ -113,7 +114,6 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
val assignLatch = new CountDownLatch(1)

if (zk != null) { zk.close }

zk = new ZooKeeper(servers, sessionTimeout, new Watcher {
def process(event: WatchedEvent) = {
assignLatch.await
Expand All @@ -131,7 +131,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
isAliveSync
Await.result( createPath(""), 10 seconds )
} catch {
case e => {
case e :Exception => {
log.error("Could not connect to zookeeper ensemble: " + servers
+ ". Connection timed out after " + connectTimeout + " milliseconds!", e)

Expand Down Expand Up @@ -170,11 +170,11 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con

/** helper method to convert a zk response in to a client reponse and handle the errors */
def handleResponse[T]( rc :Int, path :String, p :Promise[T], stat:Stat, cxt :Option[Any] )( f : => T ) :Future[T] = {
Code.get(rc) match {
(Code.get(rc) match {
case Code.OK => p.success( f )
case error if path == null => p.failure( FailedAsyncResponse( KeeperException.create(error),Option(path), Option(stat), cxt ) )
case error => p.failure( FailedAsyncResponse( KeeperException.create(error, path ),Option(path), Option(stat), cxt ) )
}
}).future
}

/** Wrapper around the ZK exists method. Watch is hardcoded to false.
Expand All @@ -189,7 +189,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, stat, ctx ){ StatResponse(path, stat, ctx ) }
}
}, ctx )
p
p.future
}

/** Wrapper around the ZK getChildren method. Watch is hardcoded to false.
Expand All @@ -204,7 +204,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, stat, ctx ){ ChildrenResponse( children.toSeq, path, stat, ctx ) }
}
}, ctx)
p
p.future
}

/** close the underlying zk connection */
Expand All @@ -218,7 +218,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
zk.exists("/", false)
true
} catch {
case e =>
case e :Exception =>
log.warn("ZK not connected in isAliveSync", e)
false
}
Expand Down Expand Up @@ -250,7 +250,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, null, ctx ){ StringResponse( name, path, ctx ) }
}
}, ctx )
p
p.future
}

/** Create a node and then return it. Under the hood this is a create followed by a get. If the stat or data is not
Expand Down Expand Up @@ -287,7 +287,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, stat, ctx ){ DataResponse(Option(data), path, stat, ctx ) }
}
}, ctx )
p
p.future
}

/** Wrapper around the zk setData method.
Expand All @@ -302,7 +302,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, stat, ctx ){ StatResponse(path, stat, ctx ) }
}
}, ctx )
p
p.future
}

/** Wrapper around zk delete method.
Expand All @@ -325,7 +325,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
handleResponse(rc, path, p, null, ctx ){ VoidResponse(path, ctx) }
}
}, ctx )
p
p.future
}
}

Expand All @@ -340,7 +340,7 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
}
} flatMap { seq =>
if ( p == path )
Promise.successful( VoidResponse( path, ctx ) )
Promise.successful( VoidResponse( path, ctx ) ).future
else
delete( p, -1, ctx ) }
}
Expand Down Expand Up @@ -376,19 +376,19 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con

case e if e == EventType.NodeCreated || e == EventType.NodeDataChanged =>
get(path, watch = ifPersist ).onComplete {
case Left( error ) =>
case Failure( error ) =>
log.error("Error on NodeCreated callback for path %s".format(mkPath(path)), error)
case Right( data ) =>
case Success( data :AsyncResponse.DataResponse ) =>
onData(mkPath(path), Some(data))
}

case EventType.NodeDeleted =>
get(path, watch = ifPersist) onComplete {
case Left( error :FailedAsyncResponse ) if error.code == Code.NONODE =>
case Failure( error :FailedAsyncResponse ) if error.code == Code.NONODE =>
onData(mkPath(path), None)
case Right( data ) =>
case Success( data :AsyncResponse.DataResponse ) =>
onData(mkPath(path), Some(data) )
case Left( error ) =>
case Failure( error ) =>
log.error("Error on NodeCreated callback for path %s".format(mkPath(path)), error)
}
}
Expand Down Expand Up @@ -417,9 +417,9 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con
def process(event: WatchedEvent) = event.getType match {
case EventType.NodeChildrenChanged =>
getChildren(p, watch = ifPersist ) onComplete {
case Left(error) =>
case Failure(error) =>
log.error("Error on NodeChildrenChanged callback for path %s".format(p), error)
case Right( kids ) =>
case Success( kids :AsyncResponse.ChildrenResponse ) =>
onKids(kids)
}
exists(p, watch = ifPersist )
Expand All @@ -446,4 +446,4 @@ class AsyncZooKeeperClient(val servers: String, val sessionTimeout: Int, val con


connect
}
}
29 changes: 29 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

akka {
//loglevel = DEBUG
debug {
receive = on

autoreceive = on

lifecycle = on

fsm = on

event-stream = on
}

actor {
debug {
receive = on

autoreceive = on

lifecycle = on

fsm = on

event-stream = on
}
}
}
13 changes: 13 additions & 0 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=ERR, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

log4j.logger.org.apache.zookeeper=WARN
#log4j.logger.org.apache.zookeeper.server=DEBUG
#log4j.logger.org.apache.zookeeper.ClientCnxn=DEBUG
15 changes: 10 additions & 5 deletions src/test/scala/AsyncZooKeeperClientSpecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package com.github.bigtoast.zookeeper
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, WordSpec}
import org.scalatest.matchers.ShouldMatchers
import java.util.concurrent.{TimeUnit, CountDownLatch, Executors}
import akka.dispatch.{Await, ExecutionContext, Future}
import akka.util.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.zookeeper.{WatchedEvent, Watcher, CreateMode}
import akka.util.Duration
import scala.concurrent.duration.Duration
import compat.Platform
import com.github.bigtoast.zookeeper.AsyncResponse.FailedAsyncResponse
import org.apache.zookeeper.KeeperException.{NoNodeException, NotEmptyException, BadVersionException}
Expand All @@ -17,8 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger

class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll {

import scala.concurrent.ExecutionContext.Implicits.global

val eService = Executors.newCachedThreadPool
implicit val to = 3 second
var zkServer :EmbeddedZookeeper = new EmbeddedZookeeper("127.0.0.1:2181")
var zk :AsyncZooKeeperClient = _

class DoAwait[T]( f :Future[T] ) {
Expand All @@ -28,7 +31,7 @@ class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with Before
implicit def toDoAwait[T]( f :Future[T] ) = new DoAwait[T]( f )

before {
zk = new AsyncZooKeeperClient("localhost:2181",1000,1000,"/async-client/tests", None, ExecutionContext.fromExecutorService( eService ) )
zk = new AsyncZooKeeperClient("127.0.0.1:2181",1000,1000,"/async-client/tests", None, ExecutionContext.fromExecutorService( eService ) )
}

after {
Expand All @@ -40,7 +43,9 @@ class AsyncZooKeeperClientSpecs extends WordSpec with ShouldMatchers with Before
}

override def afterAll {
eService.shutdown
eService.shutdown()

zkServer.shutdown()
}

"A relative path should have base path prepended" in {
Expand Down
64 changes: 64 additions & 0 deletions src/test/scala/EmbeddedZookeeper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.github.bigtoast.zookeeper

import java.io.File
import java.net.InetSocketAddress
import java.util.Random
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}

class EmbeddedZookeeper(val connectString: String) {
import EmbeddedZookeeper._

val snapshotDir = tempDir()
val logDir = tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val port = connectString.split(":")(1).toInt
val factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 100)
factory.startup(zookeeper)

def shutdown(): Unit = {
factory.shutdown()
rm(logDir)
rm(snapshotDir)
}


}

object EmbeddedZookeeper {

val random = new Random()

/**
* Delete file, in case of directory do that recursively
* @param file file or directory
*/
def rm(file: File): Unit = {
rm(Seq(file))
}

/**
* Delete sequence of files or directories, in case of directory do that recursively
* @param file seq of files or directories
*/
def rm(file: Seq[File]): Unit = {
file.headOption match {
case Some(f) if f.isDirectory =>
rm(f.listFiles().toSeq ++ file.drop(1))
f.delete
case Some(f) =>
f.delete
rm(file.drop(1))
case None =>
}
}


def tempDir(): File = {
val ioDir = System.getProperty("java.io.tmpdir")
val f = new File(ioDir, "kafka-" + random.nextInt(1000000))
f.mkdirs()
f.deleteOnExit()
f
}
}
Loading