Skip to content
This repository was archived by the owner on Jan 22, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 79 additions & 17 deletions lib/cassandra.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ var Client = function(host) {
var pair = host.split(/:/);
this.host = pair[0];
this.port = pair[1];
this.ready = false;
this.keyspace = null;
this.keyspace_ready = false;
// default consistency level
this._notready = [];
this.queue = [];
this.defaultCL = {
read: ttype.ConsistencyLevel.QUORUM,
write: ttype.ConsistencyLevel.QUORUM
Expand All @@ -38,6 +43,7 @@ sys.inherits(Client, process.EventEmitter);

/**
* Connect to Cassandra cluster
* emits ready when connection is established
*
* @param keyspace keyspace name to use
* @param credential if given, try login into cassandra
Expand All @@ -48,6 +54,7 @@ Client.prototype.connect = function() {

var keyspace_or_credential = args.shift();
var credential = args.shift();

if (keyspace_or_credential instanceof String ||
typeof keyspace_or_credential === 'string') {
// if first argument is string, then it is keyspace name
Expand All @@ -57,7 +64,7 @@ Client.prototype.connect = function() {
}

this.ready = false;
this.queue = [];
// list of not ready columnfamilies

this.connection = thrift.createConnection(this.host, this.port);
this.connection.on('error', function(err) {
Expand All @@ -84,10 +91,12 @@ Client.prototype.connect = function() {

// only when login is success, emit connected event
self.ready = true;
self.emit("connected")
self.dispatch();
});
} else {
self.ready = true;
self.emit("connected")
self.dispatch();
}
});
Expand All @@ -98,6 +107,7 @@ Client.prototype.connect = function() {
if (err) {
self.emit('error', err);
}
self.checkReady();
});
}
};
Expand Down Expand Up @@ -136,9 +146,14 @@ Client.prototype.use = function(keyspace, callback) {
self.thrift_client.set_keyspace(self.keyspace, function(err) {
if (err) {
selt.emit('error', err);
callback(err, this);
return;
}
self.keyspace_ready = true;
self.emit('keyspaceSet', self.column_families_);
self.checkReady();
if(callback)
callback(err, this);
});
});
};
Expand Down Expand Up @@ -204,6 +219,7 @@ Client.prototype.getColumnFamily = function(name) {
* Close connection
*/
Client.prototype.close = function() {
this.ready = false;
this.connection.end();
};

Expand All @@ -212,14 +228,29 @@ Client.prototype.close = function() {
*/
Client.prototype.dispatch = function() {
if (this.ready) {
if (this.queue.length > 0) {
while(this.queue.length > 0) {
var next = this.queue.shift();
next[0].apply(this, next[1]);
this.dispatch();
}
}
};

/**
* Check if all ColumnFamilies are ready and fire signal
*
* @api {private}
*/

Client.prototype.checkReady = function() {
if(this._notready.length == 0) {
this.emit("ready", this);
}
}


/**
* @api {private}

/**
*
* @param client Client
Expand All @@ -231,28 +262,54 @@ var ColumnFamily = function(client, name) {
this.queue = [];
this.ready = false;
this.client_ = client;

if(client.ready == false) {
console.log("not ready", name);
this.client_._notready.push(this);
}
var self = this;
this.client_.on('keyspaceSet', function(cfdef) {
this.client_.on('keyspaceSet', function(cdef) { self._parsecdef(cdef) });
// if the client is already ready, he got the keyspace definition we can use
if(this.client_.keyspace_ready) {
this._parsecdef(this.client_.column_families_);
}
};
sys.inherits(ColumnFamily, process.EventEmitter);

/**
* Parse columnfamily definition
*
* @param cfdef columnfamily definition
*
* @api {private}
*/

ColumnFamily.prototype._parsecdef = function(cfdef) {
// check to see if column name is valid
var cf = cfdef[self.name];
var cf = cfdef[this.name];
if (!cf) {
// column family does not exist
self.client_.emit('error', new Error('Column Family ' + self.name + ' does not exist.'));
// reset ready in case of keyspace switching
this.ready = false;
this.client_.emit('error', new Error('Column Family ' + this.name + ' does not exist.'));
}

// copy all cfdef properties
for (var prop in cf) {
if (cf.hasOwnProperty(prop)) {
self[prop] = cf[prop];
this[prop] = cf[prop];
}
}
self.isSuper = self.column_type === 'Super';
self.ready = true;
this.isSuper = this.column_type === 'Super';
this.ready = true;

self.dispatch();
});
};
var lpos = this.client_._notready.indexOf(this);
if(lpos != -1) {
this.client_._notready.splice(lpos, 1);
this.client_.checkReady();
}
this.emit("ready", this);
this.dispatch();
}

/**
* Get data from cassandra
Expand Down Expand Up @@ -389,6 +446,14 @@ ColumnFamily.prototype.slice = function() {
/**
* set (insert or update) data
*/
/**
* Get column count from cassandra
*
* @param key row key to fetch
* @param values optional. values to set, {}
* @param options optional. valid params are consistencyLevel
* @param callback callback function which called after data retrieval.
*/
ColumnFamily.prototype.set = function() {
var args = Array.prototype.slice.call(arguments);
if (!this.ready) {
Expand All @@ -400,7 +465,6 @@ ColumnFamily.prototype.set = function() {
if (typeof args[args.length - 1] === 'function') {
callback = args.pop();
}

var key = args.shift();
var values = args.shift() || {};
var options = args.shift() || {};
Expand Down Expand Up @@ -453,7 +517,6 @@ ColumnFamily.prototype.set = function() {
var mutation_map = {};
mutation_map[key] = {};
mutation_map[key][this.name] = mutations;

this.client_.thrift_client.batch_mutate(mutation_map, cl, callback);
};

Expand Down Expand Up @@ -517,10 +580,9 @@ ColumnFamily.prototype.truncate = function() {
**/
ColumnFamily.prototype.dispatch = function() {
if (this.ready) {
if (this.queue.length > 0) {
while(this.queue.length > 0) {
var next = this.queue.shift();
next[0].apply(this, next[1]);
this.dispatch();
}
}
};
Expand Down
49 changes: 35 additions & 14 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ var assert = require('assert'),

// number of tests

var log = console.log;

module.exports = {

'test if ConsistencyLevel is exported properly': function() {
// connect to cassandra
var client = new cassandra.Client('127.0.0.1:9160');

// make sure all consistency levels are exported
var CL = cassandra.ConsistencyLevel;
assert.deepEqual({
Expand Down Expand Up @@ -47,18 +48,23 @@ module.exports = {
}, client.consistencyLevel());
},

'test connecting keyspace that does not exist throws error': function() {
'test connecting keyspace that does not exist throws error': function(beforeExit) {
// connect to cassandra
var client = new cassandra.Client('127.0.0.1:9160');
var lerr;
client.on('error', function(err) {
assert.isNotNull(err);
client.close();
lerr = err;
});
client.connect('NonExistKeySpace');
beforeExit(function() {
assert.isNotNull(lerr);
client.close();
});
},

'test if accessing ColumnFamily that does not exist throws error': function() {
// connect to cassandra

var client = new cassandra.Client('127.0.0.1:9160');
client.on('error', function(err) {
assert.isNotNull(err);
Expand All @@ -67,6 +73,7 @@ module.exports = {
});
client.connect('node_cassandra_test');
client.getColumnFamily('NotExistCF');
client.connect('node_cassandra_test');
},

/*
Expand All @@ -88,16 +95,37 @@ module.exports = {
});
});
});
},
,
*/
'test ready signal on connect': function(beforeExit) {
var client = new cassandra.Client('127.0.0.1:9160');
var standard = client.getColumnFamily('Standard');
var cf_ready = false;
standard.on("ready", function(cf) { cf_ready = true; });
var got_ready = false;
client.connect('node_cassandra_test');
var superCF = client.getColumnFamily('Super');
client.on('ready', function() {
got_ready = true;
})
beforeExit(function() {
assert.ok(got_ready, "ready signal not fired");
assert.ok(standard.ready, "ready on standard not set");
assert.ok(superCF.ready, "ready on super not set");
assert.ok(cf_ready, "columnfamily did not send ready");
client.close();
});

'test if operations on client works properly': function(beforeExit) {
},
'test if operations on client works properly': function() {
// connect to cassandra
var client = new cassandra.Client('127.0.0.1:9160');
client.on('error', function(err) {
log("got error", err);
})
client.connect('node_cassandra_test');
// or login if needed
//client.connect('node_cassandra_test', {username: 'foo', password: 'bar'});

var standard = client.getColumnFamily('Standard');
var superCF = client.getColumnFamily('Super');

Expand All @@ -112,7 +140,6 @@ module.exports = {
}, function(err) {
assert.isNull(err);
});

// make sure it is seted.
standard.get('todd', function(err, res) {
assert.isNull(err);
Expand All @@ -123,7 +150,6 @@ module.exports = {
age: 24,
}, res);
});

// if you query for the key that doesn't exist, you will get empty object.
standard.get('notexist', function(err, res) {
assert.isNull(err);
Expand All @@ -135,7 +161,6 @@ module.exports = {
first_name: 'Jesse',
last_name: 'Pitman'
});

standard.get(['todd', 'jesse'], function(err, res) {
assert.isNull(err);
assert.deepEqual({
Expand All @@ -152,7 +177,6 @@ module.exports = {
}
}, res);
});

// read operation with options.
// valid options are:
// start: SliceRange start
Expand Down Expand Up @@ -185,7 +209,6 @@ module.exports = {
age: '24',
}, res);
});

// counting
// let's count number of cols
standard.count('todd', function(err, res) {
Expand Down Expand Up @@ -269,7 +292,6 @@ module.exports = {
city: 'Madison'
}, res);
});

// remove
standard.remove('todd', 'id', function(err) {
assert.isNull(err);
Expand Down Expand Up @@ -302,7 +324,6 @@ module.exports = {
assert.isNull(err);
assert.deepEqual({}, res);
});

superCF.remove('edgar');
superCF.get('edgar', function(err, res) {
assert.isNull(err);
Expand Down