From c9563364c74a523160ccf4965a60e67fb30cde49 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Wed, 27 Jul 2011 04:55:42 +0200 Subject: [PATCH 1/3] emit ready signal ready is emmitted when connection is established. allows to wait in tests for example --- lib/cassandra.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/cassandra.js b/lib/cassandra.js index 35921d1..1061774 100644 --- a/lib/cassandra.js +++ b/lib/cassandra.js @@ -38,6 +38,7 @@ sys.inherits(Client, process.EventEmitter); /** * Connect to Cassandra cluster + * emits ready when connection is established * * @param keyspace keyspace name to use * @param credential if given, try login into cassandra @@ -48,6 +49,7 @@ Client.prototype.connect = function() { var keyspace_or_credential = args.shift(); var credential = args.shift(); + if (keyspace_or_credential instanceof String || typeof keyspace_or_credential === 'string') { // if first argument is string, then it is keyspace name @@ -84,10 +86,12 @@ Client.prototype.connect = function() { // only when login is success, emit connected event self.ready = true; + self.emit("ready") self.dispatch(); }); } else { self.ready = true; + self.emit("ready") self.dispatch(); } }); From 8e36b4434d03fec67ebd9b2ba9c88d65911addd0 Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Sat, 30 Jul 2011 23:51:20 +0200 Subject: [PATCH 2/3] better state events better handle ready states. Now its perfectly save to create client.getColumnFamily() anytime in code. Their ready state reflect their state correctly now, except the client disconnects. add more tests --- lib/cassandra.js | 93 +++++++++++++++++++++++++++++++++++++++--------- test/test.js | 49 +++++++++++++++++-------- 2 files changed, 111 insertions(+), 31 deletions(-) diff --git a/lib/cassandra.js b/lib/cassandra.js index 1061774..9f3c3f0 100644 --- a/lib/cassandra.js +++ b/lib/cassandra.js @@ -28,7 +28,12 @@ var Client = function(host) { var pair = host.split(/:/); this.host = pair[0]; this.port = pair[1]; + this.ready = false; + this.keyspace = null; + this.keyspace_ready = false; // default consistency level + this._notready = []; + this.queue = []; this.defaultCL = { read: ttype.ConsistencyLevel.QUORUM, write: ttype.ConsistencyLevel.QUORUM @@ -59,7 +64,7 @@ Client.prototype.connect = function() { } this.ready = false; - this.queue = []; + // list of not ready columnfamilies this.connection = thrift.createConnection(this.host, this.port); this.connection.on('error', function(err) { @@ -86,12 +91,12 @@ Client.prototype.connect = function() { // only when login is success, emit connected event self.ready = true; - self.emit("ready") + self.emit("connected") self.dispatch(); }); } else { self.ready = true; - self.emit("ready") + self.emit("connected") self.dispatch(); } }); @@ -102,6 +107,7 @@ Client.prototype.connect = function() { if (err) { self.emit('error', err); } + self.checkReady(); }); } }; @@ -140,9 +146,14 @@ Client.prototype.use = function(keyspace, callback) { self.thrift_client.set_keyspace(self.keyspace, function(err) { if (err) { selt.emit('error', err); + callback(err, this); return; } + self.keyspace_ready = true; self.emit('keyspaceSet', self.column_families_); + self.checkReady(); + if(callback) + callback(err, this); }); }); }; @@ -208,6 +219,7 @@ Client.prototype.getColumnFamily = function(name) { * Close connection */ Client.prototype.close = function() { + this.ready = false; this.connection.end(); }; @@ -216,14 +228,29 @@ Client.prototype.close = function() { */ Client.prototype.dispatch = function() { if (this.ready) { - if (this.queue.length > 0) { + while(this.queue.length > 0) { var next = this.queue.shift(); next[0].apply(this, next[1]); - this.dispatch(); } } }; +/** + * Check if all ColumnFamilies are ready and fire signal + * + * @api {private} + */ + +Client.prototype.checkReady = function() { + if(this._notready.length == 0) { + this.emit("ready", this); + } +} + + +/** + * @api {private} + /** * * @param client Client @@ -235,28 +262,54 @@ var ColumnFamily = function(client, name) { this.queue = []; this.ready = false; this.client_ = client; - + if(client.ready == false) { + console.log("not ready", name); + this.client_._notready.push(this); + } var self = this; - this.client_.on('keyspaceSet', function(cfdef) { + this.client_.on('keyspaceSet', function(cdef) { self._parsecdef(cdef) }); + // if the client is already ready, he got the keyspace definition we can use + if(this.client_.keyspace_ready) { + this._parsecdef(this.client_.column_families_); + } +}; +sys.inherits(ColumnFamily, process.EventEmitter); + +/** + * Parse columnfamily definition + * + * @param cfdef columnfamily definition + * + * @api {private} + */ + +ColumnFamily.prototype._parsecdef = function(cfdef) { // check to see if column name is valid - var cf = cfdef[self.name]; + var cf = cfdef[this.name]; if (!cf) { // column family does not exist - self.client_.emit('error', new Error('Column Family ' + self.name + ' does not exist.')); + // reset ready in case of keyspace switching + this.ready = false; + this.client_.emit('error', new Error('Column Family ' + this.name + ' does not exist.')); } // copy all cfdef properties for (var prop in cf) { if (cf.hasOwnProperty(prop)) { - self[prop] = cf[prop]; + this[prop] = cf[prop]; } } - self.isSuper = self.column_type === 'Super'; - self.ready = true; + this.isSuper = this.column_type === 'Super'; + this.ready = true; - self.dispatch(); - }); -}; + var lpos = this.client_._notready.indexOf(this); + if(lpos != -1) { + this.client_._notready.splice(lpos, 1); + this.client_.checkReady(); + } + this.emit("ready", this); + this.dispatch(); +} /** * Get data from cassandra @@ -393,6 +446,14 @@ ColumnFamily.prototype.slice = function() { /** * set (insert or update) data */ +/** + * Get column count from cassandra + * + * @param key row key to fetch + * @param values optional. values to set, {} + * @param options optional. valid params are consistencyLevel + * @param callback callback function which called after data retrieval. + */ ColumnFamily.prototype.set = function() { var args = Array.prototype.slice.call(arguments); if (!this.ready) { @@ -404,7 +465,6 @@ ColumnFamily.prototype.set = function() { if (typeof args[args.length - 1] === 'function') { callback = args.pop(); } - var key = args.shift(); var values = args.shift() || {}; var options = args.shift() || {}; @@ -457,7 +517,6 @@ ColumnFamily.prototype.set = function() { var mutation_map = {}; mutation_map[key] = {}; mutation_map[key][this.name] = mutations; - this.client_.thrift_client.batch_mutate(mutation_map, cl, callback); }; diff --git a/test/test.js b/test/test.js index e4de9e8..10d9e33 100644 --- a/test/test.js +++ b/test/test.js @@ -11,12 +11,13 @@ var assert = require('assert'), // number of tests +var log = console.log; + module.exports = { 'test if ConsistencyLevel is exported properly': function() { // connect to cassandra var client = new cassandra.Client('127.0.0.1:9160'); - // make sure all consistency levels are exported var CL = cassandra.ConsistencyLevel; assert.deepEqual({ @@ -47,18 +48,23 @@ module.exports = { }, client.consistencyLevel()); }, - 'test connecting keyspace that does not exist throws error': function() { + 'test connecting keyspace that does not exist throws error': function(beforeExit) { // connect to cassandra var client = new cassandra.Client('127.0.0.1:9160'); + var lerr; client.on('error', function(err) { - assert.isNotNull(err); - client.close(); + lerr = err; }); client.connect('NonExistKeySpace'); + beforeExit(function() { + assert.isNotNull(lerr); + client.close(); + }); }, 'test if accessing ColumnFamily that does not exist throws error': function() { // connect to cassandra + var client = new cassandra.Client('127.0.0.1:9160'); client.on('error', function(err) { assert.isNotNull(err); @@ -67,6 +73,7 @@ module.exports = { }); client.connect('node_cassandra_test'); client.getColumnFamily('NotExistCF'); + client.connect('node_cassandra_test'); }, /* @@ -88,16 +95,37 @@ module.exports = { }); }); }); - }, + , */ + 'test ready signal on connect': function(beforeExit) { + var client = new cassandra.Client('127.0.0.1:9160'); + var standard = client.getColumnFamily('Standard'); + var cf_ready = false; + standard.on("ready", function(cf) { cf_ready = true; }); + var got_ready = false; + client.connect('node_cassandra_test'); + var superCF = client.getColumnFamily('Super'); + client.on('ready', function() { + got_ready = true; + }) + beforeExit(function() { + assert.ok(got_ready, "ready signal not fired"); + assert.ok(standard.ready, "ready on standard not set"); + assert.ok(superCF.ready, "ready on super not set"); + assert.ok(cf_ready, "columnfamily did not send ready"); + client.close(); + }); - 'test if operations on client works properly': function(beforeExit) { + }, + 'test if operations on client works properly': function() { // connect to cassandra var client = new cassandra.Client('127.0.0.1:9160'); + client.on('error', function(err) { + log("got error", err); + }) client.connect('node_cassandra_test'); // or login if needed //client.connect('node_cassandra_test', {username: 'foo', password: 'bar'}); - var standard = client.getColumnFamily('Standard'); var superCF = client.getColumnFamily('Super'); @@ -112,7 +140,6 @@ module.exports = { }, function(err) { assert.isNull(err); }); - // make sure it is seted. standard.get('todd', function(err, res) { assert.isNull(err); @@ -123,7 +150,6 @@ module.exports = { age: 24, }, res); }); - // if you query for the key that doesn't exist, you will get empty object. standard.get('notexist', function(err, res) { assert.isNull(err); @@ -135,7 +161,6 @@ module.exports = { first_name: 'Jesse', last_name: 'Pitman' }); - standard.get(['todd', 'jesse'], function(err, res) { assert.isNull(err); assert.deepEqual({ @@ -152,7 +177,6 @@ module.exports = { } }, res); }); - // read operation with options. // valid options are: // start: SliceRange start @@ -185,7 +209,6 @@ module.exports = { age: '24', }, res); }); - // counting // let's count number of cols standard.count('todd', function(err, res) { @@ -269,7 +292,6 @@ module.exports = { city: 'Madison' }, res); }); - // remove standard.remove('todd', 'id', function(err) { assert.isNull(err); @@ -302,7 +324,6 @@ module.exports = { assert.isNull(err); assert.deepEqual({}, res); }); - superCF.remove('edgar'); superCF.get('edgar', function(err, res) { assert.isNull(err); From 67c9e185c6f5b13a4576b06d13126267a52d189b Mon Sep 17 00:00:00 2001 From: Daniel Poelzleithner Date: Mon, 1 Aug 2011 11:03:32 +0200 Subject: [PATCH 3/3] fix another endless recursion deliver --- lib/cassandra.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/cassandra.js b/lib/cassandra.js index 9f3c3f0..ee91b08 100644 --- a/lib/cassandra.js +++ b/lib/cassandra.js @@ -580,10 +580,9 @@ ColumnFamily.prototype.truncate = function() { **/ ColumnFamily.prototype.dispatch = function() { if (this.ready) { - if (this.queue.length > 0) { + while(this.queue.length > 0) { var next = this.queue.shift(); next[0].apply(this, next[1]); - this.dispatch(); } } };