diff --git a/cumin.js b/cumin.js index b6c5db1..b8b13ba 100644 --- a/cumin.js +++ b/cumin.js @@ -113,12 +113,19 @@ module.exports = function(port, host, options) { nonBlockingClient.publish("cumin.enqueued", message, done); }, - listen: function(queueName, handler) { + listen: function(queueName, autoReconnect, handler) { + + // Do the argument shuffle + if (!handler) { + handler = autoReconnect; + autoReconnect = false; + } + if(!queueName) { throw new Error(consolePrefix, "Queue name must be provided. eg. 'emailQueue'."); } - if(!handler) { + if(!handler || typeof handler !== 'function') { throw new Error(consolePrefix, "You must provide a hander to .listen."); } @@ -141,6 +148,30 @@ module.exports = function(port, host, options) { } continueListening("cumin." + queueName, handler); + + + if (autoReconnect) { + + blockingClient.on('end', function(err) { + alreadyListening = false; + }) + + blockingClient.on('error', function(err) { + console.warn('Blocking client disconnect.'); + }); + + nonBlockingClient.on('error', function(err) { + console.warn('Non-blocking client disconnect.'); + }); + + blockingClient.on('ready', function() { + if (!alreadyListening) { + alreadyListening = true; + continueListening('cumin.' + queueName, handler); + } + }); + + } } } }