diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..a38a8fc --- /dev/null +++ b/.editorconfig @@ -0,0 +1,14 @@ +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true +indent_style = space +indent_size = 2 + +[Makefile] +indent_style = tab + +[*.md] +indent_size = 4 +trim_trailing_whitespace = false diff --git a/.env.tpl b/.env.tpl deleted file mode 100644 index da05319..0000000 --- a/.env.tpl +++ /dev/null @@ -1,3 +0,0 @@ -LOGS_TOKEN= -BUGS_TOKEN= -MAX_EMITTERS=20 diff --git a/.gitignore b/.gitignore index 9b5110a..6ef66ad 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,7 @@ node_modules # generated doc output docs/ +<<<<<<< HEAD ### VisualStudio template ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. @@ -416,3 +417,8 @@ com_crashlytics_export_strings.xml crashlytics.properties crashlytics-build.properties fabric.properties +======= + +# To do +TO-DO.md +>>>>>>> New features and enehancement diff --git a/.npmignore b/.npmignore index 8b9b9bf..db20b8d 100644 --- a/.npmignore +++ b/.npmignore @@ -1,39 +1,14 @@ -# Logs -logs -*.log - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul +# test +test coverage -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory -# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git -node_modules -.env - -# documentation -docs -samples -test +# tools +sonar-project.properties +.eslintrc +.eslintignore .gitignore -.jshintignore -.npmignore -CONTRIBUTING.md Makefile -sonar-project.properties + +#editor settings +.idea +.editorconfig diff --git a/examples/benchmarks/index.js b/examples/benchmarks/index.js new file mode 100644 index 0000000..54abe60 --- /dev/null +++ b/examples/benchmarks/index.js @@ -0,0 +1,58 @@ +const Benchmark = require('benchmark'); +const { producer, consumer, connection } = require('../../index')({ + hostname: process.env.HOSTNAME || 'consumer', + poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 5000, + timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000, + rpcTimeout: parseInt(process.env.AMQP_RPC_TIMEOUT, 10) || 2000, + host: process.env.AMQP_URL || 'amqp://localhost' +}); + +const LOOP_INTERVAL = parseInt(process.env.LOOP_INTERVAL, 10) || 10; +const MAX_MESSAGES = parseInt(process.env.MAX_MESSAGES, 10) || 100; +const suite = new Benchmark.Suite; + +process.on('unhandledRejection', error => { + // Will print "unhandledRejection err is not dewfined" + console.error('unhandledRejection', error); +}); + +connection.connect() +.then(() => { + consumer.consume('test:queue', (msg) => { + // console.log(msg); + return Promise.resolve(true); + }) + .then(() => { + // add tests + suite + .add('with RPC', { + defer: true, + repeat: 10, + fn: function (deferred) { + // avoid test inlining + producer.produce('test:queue', { message: 'message', duration: 1000 }, { rpc: true }) + .then(() => deferred.resolve()) + .catch(() => deferred.resolve()); + } + }) + .add('without RPC', { + defer: true, + repeat: 10, + fn: function (deferred) { + // avoid test inlining + producer.produce('test:queue', { message: 'message', duration: 1000 }, { rpc: false }) + .then(() => deferred.resolve()) + .catch(() => deferred.resolve()); + } + }) + // // add listeners + .on('cycle', function (event) { + console.log(String(event.target)); + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')); + }) + // run async + .run({ 'async': true }); + }); +}); diff --git a/examples/stress-test/consumer/Dockerfile b/examples/stress-test/consumer/Dockerfile new file mode 100644 index 0000000..995cb54 --- /dev/null +++ b/examples/stress-test/consumer/Dockerfile @@ -0,0 +1,11 @@ +FROM dialonce/nodejs:latest + +WORKDIR /usr/src/app + +COPY . /usr/src/app + +RUN npm i --production + +EXPOSE 3000 + +CMD [ "node", "./examples/stress-test/consumer/consumer.js" ] diff --git a/examples/stress-test/consumer/consumer.js b/examples/stress-test/consumer/consumer.js new file mode 100644 index 0000000..a11b1bf --- /dev/null +++ b/examples/stress-test/consumer/consumer.js @@ -0,0 +1,15 @@ +const { consumer, producer, connection } = require('../../../index')({ + hostname: process.env.HOSTNAME || 'consumer', + poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 100, + timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000, + host: process.env.AMQP_URL || 'amqp://localhost', + prefetch: 100, + enableCompression: true +}); + +connection.connect() +.then(() => { + consumer.consume('queue:heartbeat', (msg) => Promise.resolve(true)); + consumer.consume('queue:check-permissions', (msg) => Promise.resolve(true)); + consumer.consume('queue:update-paths', (msg) => Promise.resolve(true)); +}); diff --git a/examples/stress-test/docker-compose.yml b/examples/stress-test/docker-compose.yml new file mode 100644 index 0000000..8644640 --- /dev/null +++ b/examples/stress-test/docker-compose.yml @@ -0,0 +1,43 @@ +version: '3' + +services: + rabbitmq_server: + image: rabbitmq:3.6-management-alpine + ports: + - 15672:15672 + - 5672:5672 + + consumer_service: + build: + context: ../../ + dockerfile: ./examples/stress-test/consumer/Dockerfile + environment: + - AMQP_URL=amqp://rabbitmq_server + - AMQP_DEBUG=true + - AMQP_TIMEOUT=1000 + - AMQP_POOL_SIZE=5 + - AMQP_PREFETCH=100 + - DEBUG=* + links: + - rabbitmq_server + depends_on: + - rabbitmq_server + + producer_service: + build: + context: ../../ + dockerfile: ./examples/stress-test/producer/Dockerfile + environment: + - AMQP_URL=amqp://rabbitmq_server + - AMQP_DEBUG=true + - AMQP_TIMEOUT=1000 + - AMQP_POOL_SIZE=5000 + - AMQP_RPC_TIMEOUT=15000 + - LOOP_INTERVAL=1 + - MAX_MESSAGES=1 + - DEBUG=* + links: + - rabbitmq_server + depends_on: + - rabbitmq_server + - consumer_service diff --git a/examples/stress-test/producer/Dockerfile b/examples/stress-test/producer/Dockerfile new file mode 100644 index 0000000..9ca648f --- /dev/null +++ b/examples/stress-test/producer/Dockerfile @@ -0,0 +1,11 @@ +FROM dialonce/nodejs:latest + +WORKDIR /usr/src/app + +COPY . /usr/src/app + +RUN npm i --production + +EXPOSE 3000 + +CMD [ "node", "./examples/stress-test/producer/producer.js" ] diff --git a/examples/stress-test/producer/producer.js b/examples/stress-test/producer/producer.js new file mode 100644 index 0000000..97eaa07 --- /dev/null +++ b/examples/stress-test/producer/producer.js @@ -0,0 +1,23 @@ +const { producer, connection } = require('../../../index')({ + hostname: process.env.HOSTNAME || 'consumer', + poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 5000, + timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000, + rpcTimeout: parseInt(process.env.AMQP_RPC_TIMEOUT, 10) || 15000, + host: process.env.AMQP_URL || 'amqp://localhost', + enableCompression: true +}); + +const LOOP_INTERVAL = parseInt(process.env.LOOP_INTERVAL, 10) || 1000; +const MAX_MESSAGES = parseInt(process.env.MAX_MESSAGES, 10) || 100; + +connection.connect() +.then(() => { + setInterval(() => { + for (let i = 0; i < MAX_MESSAGES; ++i) { + producer.produce('queue:heartbeat', { message: `message`, duration: 1000 }, { rpc: true }) + .then((res1) => producer.produce('queue:check-permissions', res1, { rpc: true })) + .then((res2) => producer.produce('queue:update-paths', res2, { rpc: true })) + .catch(console.error); + } + }, LOOP_INTERVAL); +}); diff --git a/index.js b/index.js new file mode 100644 index 0000000..98316e1 --- /dev/null +++ b/index.js @@ -0,0 +1,41 @@ +require('events').EventEmitter.prototype._maxListeners = process.env.MAX_EMITTERS || 20; + +const Connection = require('./lib/connection'); +const Consumer = require('./lib/consumer'); +const Producer = require('./lib/producer'); +const uuid = require('uuid'); +const debug = require('debug'); + +let connection; + +module.exports = (config = {}) => { + const configuration = Object.assign({ + host: 'amqp://localhost', + // number of fetched messages, at once + prefetch: 5, + // requeue put back message into the broker if consumer crashes/trigger exception + requeue: true, + // requeue count, after this number of retry the message will be rejected + requeueCount: 5, + // time between two reconnect (ms) + timeout: 1000, + // default timeout for RPC calls. If set to '0' there will be none. + rpcTimeout: 15000, + consumerSuffix: '', + // generate a hostname so we can track this connection on the broker (rabbitmq management plugin) + hostname: process.env.HOSTNAME || process.env.USER || uuid.v4(), + enableCompression: process.env.AMQP_ENABLE_COMPRESSION === 'true' + // the transport to use to debug. if provided, bunnymq will show some logs + // transport: utils.emptyLogger + }, config); + + if (!connection) { + connection = new Connection(configuration, debug('[BunnyMq:connection]')); + } + + return { + consumer: new Consumer(connection, configuration, debug('[BunnyMq:consumer]')), + producer: new Producer(connection, configuration, debug('[BunnyMq:producer]')), + connection, + }; +}; diff --git a/lib/channel-pool.js b/lib/channel-pool.js new file mode 100644 index 0000000..b2db267 --- /dev/null +++ b/lib/channel-pool.js @@ -0,0 +1,64 @@ +const utils = require('./modules/utils'); +const Mutex = require('./modules/mutex'); + +let instance; +const mutex = new Mutex(); + +const MAX_POOL_SIZE = 20000; + +class channelPool { + constructor(poolSize, connection) { + if (!instance) { + instance = this; + + this.connection = connection; + this.poolSize = poolSize; + this.pool = []; + this.channelCount = 0; + // methods binding + this.create = this.create.bind(this); + this.addChannel = this.addChannel.bind(this); + this.getChannel = this.getChannel.bind(this); + this.releaseChannel = this.releaseChannel.bind(this); + } + + return instance; + } + + create() { + return Promise.all( + Array.from({ length: this.poolSize }).map(() => { + return this.addChannel(); + }) + ); + } + + addChannel() { + return this.connection.createChannel() + .then((channel) => { + ++this.channelCount; + this.pool.push(channel); + + return channel; + }); + } + + releaseChannel(channel) { + if (this.pool.length < this.poolSize) { + this.pool.push(channel); + } + } + + getChannel() { + return mutex.synchronize(() => { + if (this.pool.length) return Promise.resolve(this.pool.shift()); + + if (this.channelCount < MAX_POOL_SIZE) return this.addChannel(false); + + return utils.timeoutPromise(Math.floor(Math.random() * 10) + 1) + .then(this.getChannel); + }); + } +} + +module.exports = channelPool; diff --git a/lib/connection.js b/lib/connection.js new file mode 100644 index 0000000..55fe4f2 --- /dev/null +++ b/lib/connection.js @@ -0,0 +1,122 @@ +const amqp = require('amqplib'); +const ChannelPool = require('./channel-pool'); +const packageVersion = require('../package.json').version; +const utils = require('./modules/utils'); +const events = require('events'); + +let instance; + +class Connection { + constructor(config, debug) { + if (!instance) { + instance = this; + + this.config = config; + this.connection = undefined; + this.state = Connection.STATES.DISCONNECTED; + this.channelPool = null; + this.startedAt = new Date().toISOString(); + this.eventEmitter = new events.EventEmitter(); + this.eventEmitter.setMaxListeners(0); + this.debug = debug; + // methods binding + this.connect = this.connect.bind(this); + this.createChannel = this.createChannel.bind(this); + this.getChannel = this.getChannel.bind(this); + this.releaseChannel = this.releaseChannel.bind(this); + } + + return instance; + } + + static get STATES() { + return { + DISCONNECTED: -1, + CONNECTING: 0, + CONNECTED: 1 + }; + } + + connect() { + return new Promise((resolve, reject) => { + if (this.state === Connection.STATES.CONNECTED) { + this.eventEmitter.emit('connected', this.connection); + return resolve(this.connection); + } + + if (this.state === Connection.STATES.DISCONNECTED) { + this.state = Connection.STATES.CONNECTING; + + // set clientProperties to get more info about connection + const clientProperties = { + hostname: this.config.hostname, + bunnymq: packageVersion, + startedAt: this.startedAt, + connectedAt: new Date().toISOString() + }; + + // open amqp connection + amqp.connect(this.config.host, { clientProperties }) + .then((connection) => { + this.connection = connection; + + const pool = new ChannelPool(this.config.poolSize, this.connection); + + const promisePoolCreation = pool.create() + .then(() => { + this.channelPool = pool; + this.state = Connection.STATES.CONNECTED; + this.eventEmitter.emit('connected', this.connection); + return this.connection; + }); + + this.debug('connection openned'); + + // set listeners + this.connection.on('close', () => { + this.debug('connection closed'); + this.connection = null; + }); + + this.connection.on('error', (err) => { + this.debug(err); + return reject(err); + }); + + return resolve(promisePoolCreation); + }) + .catch(() => { + this.state = Connection.STATES.DISCONNECTED; + return resolve(utils.timeoutPromise(this.config.timeout) + .then(() => this.connect())); + }); + } + }); + } + + releaseChannel(channel) { + return this.channelPool.releaseChannel(channel); + } + + getChannel(channel) { + return this.channelPool.getChannel(channel); + } + + createChannel(resolve) { + return this.connection.createChannel() + .then((channel) => { + channel.on('close', () => { + this.debug('channel closed'); + }); + + channel.on('error', (err) => { + this.debug('channel:', err); + throw err; + }); + + return channel; + }); + } +} + +module.exports = Connection; diff --git a/lib/consumer.js b/lib/consumer.js new file mode 100644 index 0000000..7c7b6a4 --- /dev/null +++ b/lib/consumer.js @@ -0,0 +1,147 @@ +const utils = require('./modules/utils'); +const Connection = require('./connection'); +const Message = require('./message'); + +process.on('unhandledRejection', console.error); + +class Consumer { + constructor(connection, config, debug) { + this.connection = connection; + this.config = config; + this.debug = debug; + // methods binding + this.checkRPC = this.checkRPC.bind(this); + this.consume = this.consume.bind(this); + this.consumeWhenConnected = this.consumeWhenConnected.bind(this); + this.reject = this.reject.bind(this); + } + + checkRPC(channel, queue, msg, options) { + /** + * When message contains a replyTo property, we try to send the answer back + * @param {any} content the received message: + * @return {any} object, string, number... the current received message + */ + return (content) => { + if (msg.replyTo) { + const message = new Message(Object.assign(msg, { + body: content, + from: msg.to, + to: msg.from, + requeue: options.requeue, + requeueCount: options.requeueCount + })); + + return Message.serialize(message, options.enableCompression) + .then((serializedMsg) => { + this.debug(`Queueing message in queue=${queue} rpcQueue=${msg.replyTo} > `, message); + + return channel.sendToQueue(msg.replyTo, serializedMsg, { + correlationId: msg.correlationId, + persistent: true, + durable: true + }); + }); + } + + return msg; + }; + } + + consumeWhenConnected(queue, options, callback) { + return this.connection.getChannel() + .then((channel) => { + // consumer gets a suffix if one is set on the configuration, to suffix all queues names + // ex: service-something with suffix :ci becomes service-suffix:ci etc. + const suffixedQueue = `${queue}${this.config.consumerSuffix || ''}`; + + channel.prefetch(options.prefetch); + + return channel.assertQueue(suffixedQueue, options) + .then((q) => ({ validQueue: q.queue, channel })); + }) + .then(({ validQueue, channel }) => { + this.debug(`init queue=${validQueue}`); + + channel.consume(validQueue, (msg) => { + let deserializedMsg; + // main answer management chaining + // receive message, parse it, execute callback, check if should answer, ack/reject message + Message.deserialize(msg.content, msg.fields) + .then((resMsg) => { + deserializedMsg = resMsg; + + this.debug(`Receiving message in queue=${validQueue} < `, deserializedMsg); + + return Promise.resolve(callback(deserializedMsg)) + .then(this.checkRPC(channel, validQueue, deserializedMsg, options)) + .then(() => channel.ack(msg)) + }) + .catch((err) => { + // if something bad happened in the callback, reject the message so we can requeue it (or not) + this.debug(err); + return this.reject(channel, queue, deserializedMsg, msg, options); + }); + }, { noAck: false }); + + return true; + }) + .catch((err) => { + this.debug(err); + + // add timeout between retries because we don't want to overflow the CPU + return utils.timeoutPromise(this.config.timeout) + .then(() => this.consume(queue, options, callback)) + }); + } + + reject(channel, queue, deserializedMsg, msg, options) { + const headers = msg.properties.headers; + headers.requeueCount -= 1; + + channel.reject(msg, false); + if (headers.requeue && parseInt(headers.requeueCount, 10) >= 0) { + const message = new Message(deserializedMsg); + message.redelivered = true; + + return Message.serialize(message, options.enableCompression) + .then((serializedMsg) => { + options.headers = { requeueCount: headers.requeueCount, requeue: headers.requeue }; + + this.debug(`Requeueing message in queue=${queue} > `, message); + + channel.sendToQueue(queue, serializedMsg, options); + }); + } + } + + consume(queue, options, callback) { + return new Promise((resolve, reject) => { + let currentOptions = options; + let currentCallback = callback; + + if (typeof currentOptions === 'function') { + currentCallback = currentOptions; + // default message options + currentOptions = { persistent: true, durable: true }; + } + + currentOptions = Object.assign({}, this.config, currentOptions); + + if (this.connection.state !== Connection.STATES.CONNECTED) { + this.connection.eventEmitter.once('connected', (connection) => { + this.consumeWhenConnected(queue, currentOptions, currentCallback) + .then(resolve) + .catch(reject); + }); + this.connection.connect(); + } else { + this.consumeWhenConnected(queue, currentOptions, currentCallback) + .then(resolve) + .catch(reject); + } + }); + } +} + +module.exports = Consumer; diff --git a/lib/message.js b/lib/message.js new file mode 100644 index 0000000..9a34179 --- /dev/null +++ b/lib/message.js @@ -0,0 +1,81 @@ +const uuid = require('uuid'); +const zlib = require('zlib'); + +class Message { + constructor(msg) { + this.requestId = msg.requestId || uuid.v4(); + this.messageId = msg.messageId || uuid.v4(); + this.from = msg.from || 'unknown'; + this.to = msg.to || 'unknown'; + this.timestamp = new Date(); + this.priority = msg.priority || 1; + this.body = msg.body || ''; + this.correlationId = msg.correlationId; + this.replyTo = msg.replyTo; + this.ttl = msg.ttl || 0; + this.type = msg.type || 'msg'; + } + + set setMessageId(value) { + this.messageId = value; + } + + get getMessageId() { + return this.messageId; + } + + static compress(input) { + return new Promise((resolve, reject) => { + zlib.gzip(input, (err, data) => { + if (!err) return resolve(data); + + if (err.message.includes('incorrect header check')) return resolve(input); + + return reject(err); + }); + }); + } + + static decompress(input) { + return new Promise((resolve, reject) => { + zlib.gunzip(input, (err, data) => { + if (!err) return resolve(data); + + if (err.message.includes('incorrect header check')) return resolve(input); + + return reject(err); + }); + }); + } + + static serialize(message, enableCompression) { + console.log('message:', message); + return new Promise((resolve, reject) => { + const msg = Buffer.from(JSON.stringify(message), 'utf-8'); + + if (!enableCompression) return resolve(msg); + + return Message.compress(msg).then(resolve).catch(reject); + }); + } + + static deserialize(message, metadata = {}) { + return new Promise((resolve, reject) => { + return Message.decompress(message) + .then((decompressedMsg) => { + const msg = Object.assign({}, JSON.parse(decompressedMsg.toString())); + const deserializedMsg = new Message(msg); + + deserializedMsg.routingKey = metadata.routingKey; + deserializedMsg.exchange = metadata.exchange; + deserializedMsg.redelivered = msg.redelivered || metadata.redelivered; + deserializedMsg.consumerTag = metadata.consumerTag; + + return resolve(deserializedMsg); + }) + .catch(reject); + }); + } +} + +module.exports = Message; diff --git a/lib/modules/mutex.js b/lib/modules/mutex.js new file mode 100644 index 0000000..7bc0d37 --- /dev/null +++ b/lib/modules/mutex.js @@ -0,0 +1,42 @@ +/*eslint-disable*/ + +class Mutex { + constructor() { + this.queue = []; + this.busy = false; + + this.synchronize = this.synchronize.bind(this); + this.dequeue = this.dequeue.bind(this); + this.execute = this.execute.bind(this); + } + + synchronize(task) { + return new Promise((resolve, reject) => { + this.queue.push([task, resolve, reject]); + if (!this.busy) this.dequeue(); + }); + } + + dequeue() { + this.busy = true; + const next = this.queue.shift(); + + if (next) return this.execute(next); + this.busy = false; + } + + execute(record) { + const task = record[0]; + const resolve = record[1]; + const reject = record[2]; + + return task() + .then(resolve) + .then(() => { + this.dequeue(); + }) + .catch(reject); + } +} + +module.exports = Mutex; diff --git a/lib/modules/utils.js b/lib/modules/utils.js new file mode 100644 index 0000000..f6910ae --- /dev/null +++ b/lib/modules/utils.js @@ -0,0 +1,89 @@ +const serializeError = require('serialize-error'); +const deserializeError = require('deserialize-error'); + +function timeoutPromise(timer) { + return new Promise((resolve) => { + setTimeout(resolve, timer); + }); +} + +function deserializeMsg(msg) { + // if sender put a json header, we parse it to avoid the pain for the consumer + if (!msg.content) return undefined; + + if (msg.properties.contentType === 'application/json') { + try { + const content = JSON.parse(msg.content.toString()); + if (content && content.error && content.error instanceof Object) { + content.error = deserializeError(content.error); + } + + return content; + } catch (err) { + console.warn('message is not a valid json'); + return msg.content; + } + } + + if (msg.content.length) { + return msg.content.toString(); + } + + return msg.content; +} + +/* eslint-disable no-param-reassign */ +function serializeMsg(msg, options) { + const falsie = [undefined, null]; + + if (typeof msg === 'string') return Buffer.from(msg, 'utf-8'); + + if (!falsie.includes(msg)) { + if (msg.error instanceof Error) { + msg.error = serializeError(msg.error); + } + + try { + // if content is not a string, we JSONify it (JSON.parse can handle numbers, etc. so we can skip all the checks) + msg = JSON.stringify(msg); + options.contentType = 'application/json'; + + return Buffer.from(msg, 'utf-8'); + } catch (err) { + return Buffer.from([]); + } + } + + return Buffer.from([]); +} + +function completeAssign(target, ...sources) { + sources.forEach(source => { + let descriptors = Object.keys(source).reduce((descriptors, key) => { + descriptors[key] = Object.getOwnPropertyDescriptor(source, key); + return descriptors; + }, {}); + // Par défaut, Object.assign copie également + // les symboles énumérables + Object.getOwnPropertySymbols(source).forEach(sym => { + let descriptor = Object.getOwnPropertyDescriptor(source, sym); + if (descriptor.enumerable) { + descriptors[sym] = descriptor; + } + }); + Object.defineProperties(target, descriptors); + }); + return target; +} + + +class Deferred { + constructor() { + this.promise = new Promise((resolve, reject) => { + this.reject = reject; + this.resolve = resolve; + }); + } +} + +module.exports = { timeoutPromise, serializeMsg, deserializeMsg, completeAssign, Deferred }; diff --git a/lib/producer.js b/lib/producer.js new file mode 100644 index 0000000..86da7c7 --- /dev/null +++ b/lib/producer.js @@ -0,0 +1,179 @@ +const Connection = require('./connection'); +const Message = require('./message'); +const utils = require('./modules/utils'); +const uuid = require('uuid'); + +class Producer { + constructor(connection, config, debug) { + this.connection = connection; + this.rpcQueues = {}; + this.config = config; + this.debug = debug; + // method binding + this.checkAnswer = this.checkAnswer.bind(this); + this.checkRPCTimeout = this.checkRPCTimeout.bind(this); + this.createRpcQueue = this.createRpcQueue.bind(this); + this.checkRPC = this.checkRPC.bind(this); + this.produce = this.produce.bind(this); + this.produceWhenConnected = this.produceWhenConnected.bind(this); + } + + static get ERRORS() { + return { + TIMEOUT: 'Timeout reached', + PREVIOUS_SESSION: 'Receiving previous session RPC message: callback no more in memory' + }; + }; + + checkAnswer(channel, queue) { + return (msg) => { + // check the correlation ID sent by the initial message using RPC + const rpcPromise = this.rpcQueues[queue][msg.properties.correlationId]; + + this.connection.releaseChannel(channel); + + return Message.deserialize(msg.content, msg.fields) + .then((deserializedMsg) => { + this.debug(`queue=${queue} < `, deserializedMsg); + + if (rpcPromise && typeof rpcPromise.resolve === 'function') { + rpcPromise.resolve(deserializedMsg); + delete this.rpcQueues[queue][msg.properties.correlationId]; + } + }); + }; + } + + checkRPCTimeout(channel, queue, corrId, rpcTimeout) { + setTimeout(() => { + const rpcPromise = this.rpcQueues[queue][corrId]; + if (rpcPromise) { + rpcPromise.reject(new Error(Producer.ERRORS.TIMEOUT)); + delete this.rpcQueues[queue][corrId]; + this.connection.releaseChannel(channel); + } + }, rpcTimeout); + } + + publishOrSendToQueue(channel, queue, msg, options) { + const message = new Message({ + body: msg, + correlationId: options.correlationId, + replyTo: options.replyTo, + // requeue: options.requeue, + // requeueCount: options.requeueCount + }); + + return Message.serialize(message, options.enableCompression) + .then((serializedMsg) => { + this.debug(`Queueing message in queue=${queue} > `, message); + + options.headers = { requeueCount: options.requeueCount, requeue: options.requeue }; + + if (!options.routingKey) return channel.sendToQueue(queue, serializedMsg, options); + + return channel.publish(queue, options.routingKey, serializedMsg, options); + }); + } + + createRpcQueue(channel, queue) { + this.rpcQueues[queue] = this.rpcQueues[queue] || {}; + + const rpcQueue = this.rpcQueues[queue]; + + if (rpcQueue.queue) return Promise.resolve(rpcQueue.queue); + + // we create the callback queue using base queue name + appending config hostname and :res for clarity + // ie. if hostname is gateway-http and queue is service-oauth, response queue will be service-oauth:gateway-http:res + // it is important to have different hostname or no hostname on each module sending message or there will be conflicts + const resQueue = `${queue}:${this.config.hostname || uuid.v4()}:res`; + + return channel.assertQueue(resQueue, { durable: false, exclusive: true }) + .then((q) => { + rpcQueue.queue = q.queue; + // if channel is closed, we want to make sure we cleanup the queue so future calls will recreate it + channel.on('error', () => { + delete rpcQueue.queue; + this.createRpcQueue(channel, queue); + }); + + return channel.consume(q.queue, this.checkAnswer(channel, queue), { noAck: true }); + }) + .then(() => rpcQueue.queue) + .catch(() => { + delete rpcQueue.queue; + return utils.timeoutPromise(this.config.timeout) + .then(() => this.createRpcQueue(channel, queue)); + }); + } + + checkRPC(channel, queue, msg, options) { + if (!options.rpc) return this.publishOrSendToQueue(channel, queue, msg, options); + + const correlationId = uuid.v4(); + const mergedOptions = Object.assign({ correlationId }, options); + + return this.createRpcQueue(channel, queue) + .then(() => { + mergedOptions.replyTo = this.rpcQueues[queue].queue; + + return this.publishOrSendToQueue(channel, queue, msg, mergedOptions) + .then(() => { + // defered promise that will resolve when response is received + const rpcPromise = new utils.Deferred(); + this.rpcQueues[queue][correlationId] = rpcPromise; + + if (mergedOptions.rpcTimeout) { + this.checkRPCTimeout(channel, queue, correlationId, mergedOptions.rpcTimeout); + } + + return rpcPromise.promise; + }); + }); + } + + produceWhenConnected(channel, queue, msg, options) { + // undefined can't be serialized/buffered :p + const validMsg = msg || null; + + return this.checkRPC(channel, queue, validMsg, options) + .then((res) => { + this.connection.releaseChannel(channel); + return res; + }) + .catch((err) => { + this.debug(err); + + if ([Producer.ERRORS.TIMEOUT].includes(err.message)) return Promise.reject(err); + + // add timeout between retries because we don't want to overflow the CPU + return utils.timeoutPromise(this.config.timeout) + .then(() => this.produce(queue, validMsg, options)); + }); + } + + produce(queue, msg, options) { + return new Promise((resolve, reject) => { + // default options are persistent and durable because we do not want to miss any outgoing message + const mergedOptions = Object.assign({ persistent: true, durable: true }, this.config, options); + + this.connection.eventEmitter.once('connected', () => { + this.connection.getChannel() + .then((channel) => { + channel.prefetch(mergedOptions.prefetch); + + this.produceWhenConnected(channel, queue, msg, mergedOptions) + .then(resolve) + .catch(reject); + }) + .catch((err) => { + if (err.msg === 'Channel pool is empty') throw err; + }); + }); + + this.connection.connect(); + }); + } +} + +module.exports = Producer; diff --git a/package.json b/package.json index f1a0f44..dcac7d7 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "publish", "subscribe" ], - "main": "src/index.js", + "main": "index.js", "scripts": { "doc": "jsdoc -d ./docs --readme README.md --package package.json --template node_modules/minami src/*", "test": "make test" @@ -27,21 +27,21 @@ }, "homepage": "https://github.com/dial-once/node-bunnymq#readme", "dependencies": { - "amqplib": "^0.4.1", - "deserialize-error": "0.0.3", + "amqplib": "^0.5.1", "dotenv": "^2.0.0", - "serialize-error": "^2.1.0", + "debug": "^2.6.8", "uuid": "^3.0.1" }, "devDependencies": { - "assert": "1.3.0", + "assert": "^1.4.1", + "benchmark": "^2.1.4", "child-process-promise": "^2.2.0", - "eslint": "^3.11.1", - "eslint-config-airbnb": "^13.0.0", + "eslint": "^3.18.0", + "eslint-config-airbnb": "^14.1.0", + "eslint-config-airbnb-base": "^11.1.1", "eslint-plugin-import": "^2.2.0", - "eslint-plugin-jsx-a11y": "^2.2.3", - "eslint-plugin-react": "^6.8.0", - "minami": "1.1.1" + "eslint-plugin-jsx-a11y": "^4.0.0", + "eslint-plugin-react": "^6.10.2" }, "engines": { "node": ">=6" diff --git a/samples/areas/consumer.js b/samples/areas/consumer.js deleted file mode 100644 index bee2491..0000000 --- a/samples/areas/consumer.js +++ /dev/null @@ -1,33 +0,0 @@ -/* eslint no-console: off */ -const consumer = require('../../src/index')().consumer; - -/* eslint no-param-reassign: "off" */ -consumer.consume('circleArea', (msg) => { - msg = JSON.parse(msg); - - const area = (parseInt(msg.r, 10) ** 2) * Math.PI; - - console.info('Circle area is: ', area); - - return area; -}); - -consumer.consume('squareArea', (msg) => { - msg = JSON.parse(msg); - - const area = parseInt(msg.l, 10) ** 2; - - console.info('Square area is: ', area); - - return area; -}); - -consumer.consume('triangleArea', (msg) => { - msg = JSON.parse(msg); - - const area = parseInt(msg.b, 10) * parseInt(msg.h, 10) * 0.5; - - console.info('Triangle area is: ', area); - - return area; -}); diff --git a/samples/areas/producer.js b/samples/areas/producer.js deleted file mode 100644 index 893dbbf..0000000 --- a/samples/areas/producer.js +++ /dev/null @@ -1,7 +0,0 @@ -const producer = require('../../src/index')().producer; - -producer.produce('circleArea', { toDo: 'Calculate circle area', r: 10 }); - -producer.produce('squareArea', { toDo: 'Calculate square area', l: 12 }); - -producer.produce('triangleArea', { toDo: 'Calculate triangle area', b: 2, h: 7 }); diff --git a/samples/hello-world/consumer.js b/samples/hello-world/consumer.js deleted file mode 100644 index 6078f5b..0000000 --- a/samples/hello-world/consumer.js +++ /dev/null @@ -1,12 +0,0 @@ -/* eslint no-console: off */ -const consumer = require('../../src/index')().consumer; - -consumer.connect() -.then(() => { - consumer.consume('queueName', () => - new Promise((resolve) => { - setTimeout(resolve(true), 5000); - }) - ) - .then(console.info); // true if message has been acknowledged, else false -}); diff --git a/samples/hello-world/producer.js b/samples/hello-world/producer.js deleted file mode 100644 index fe1c5bd..0000000 --- a/samples/hello-world/producer.js +++ /dev/null @@ -1,8 +0,0 @@ -/* eslint no-console: off */ -const producer = require('../../src/index')().producer; - -producer.connect() -.then(() => { - producer.produce('queueName', { message: 'hello world!' }) - .then(console.info); // true if message has been sent, else false -}); diff --git a/samples/multi-produce-consume/consumer.js b/samples/multi-produce-consume/consumer.js deleted file mode 100644 index 839ca41..0000000 --- a/samples/multi-produce-consume/consumer.js +++ /dev/null @@ -1,19 +0,0 @@ -const consumer = require('../../src/index')().consumer; - -let interval; -let i = 0; - -interval = setInterval(() => { - consumer.consume(`queueName-${i}`, msg => - new Promise((resolve) => { - setTimeout(() => { - resolve(`res:${JSON.stringify(msg)}`); - }, 3000); - }) - ); - - i += 1; - if (i >= 5) { - interval = clearInterval(interval); - } -}, 1000); diff --git a/samples/multi-produce-consume/producer.js b/samples/multi-produce-consume/producer.js deleted file mode 100644 index 1f4fc51..0000000 --- a/samples/multi-produce-consume/producer.js +++ /dev/null @@ -1,13 +0,0 @@ -const producer = require('../../src/index')().producer; - -let i = 0; -let interval; - -interval = setInterval(() => { - producer.produce(`queueName-${i}`, { message: `start-${i}` }, { rpc: true }); - - i += 1; - if (i >= 5) { - interval = clearInterval(interval); - } -}, 2000); diff --git a/samples/prefetch/consumer.js b/samples/prefetch/consumer.js deleted file mode 100644 index 766c478..0000000 --- a/samples/prefetch/consumer.js +++ /dev/null @@ -1,15 +0,0 @@ -const consumer = require('../../src/index')().consumer; - -let i = 0; -let interval; - -interval = setInterval(() => { - consumer.consume(`queue-prefetch${i}`, msg => - Promise.resolve(JSON.stringify(msg)) - ); - - i += 1; - if (i >= 100) { - interval = clearInterval(interval); - } -}, 500); diff --git a/samples/prefetch/producer.js b/samples/prefetch/producer.js deleted file mode 100644 index 5658fdb..0000000 --- a/samples/prefetch/producer.js +++ /dev/null @@ -1,15 +0,0 @@ -/* eslint no-console: off */ -const producer = require('../../src/index')().producer; - -let i = 0; -let interval; - -interval = setInterval(() => { - producer.produce(`queue-prefetch${i}`, { message: `start-${i}` }, { rpc: true }) - .then(console.info); - - i += 1; - if (i >= 100) { - interval = clearInterval(interval); - } -}, 500); diff --git a/src/classes/deferred.js b/src/classes/deferred.js deleted file mode 100644 index 0d97261..0000000 --- a/src/classes/deferred.js +++ /dev/null @@ -1,10 +0,0 @@ -class Deferred { - constructor() { - this.promise = new Promise((resolve, reject) => { - this.reject = reject; - this.resolve = resolve; - }); - } -} - -module.exports = Deferred; diff --git a/src/modules/connection.js b/src/modules/connection.js deleted file mode 100644 index 62f22a7..0000000 --- a/src/modules/connection.js +++ /dev/null @@ -1,122 +0,0 @@ -const amqp = require('amqplib'); -const assert = require('assert'); -const packageVersion = require('../../package.json').version; - -class Connection { - constructor(config) { - this._config = config; - this.connections = {}; - this.startedAt = new Date().toISOString(); - } - - /** - * Connect to the broker. We keep only 1 connection for each connection string provided in config, as advised by RabbitMQ - * @return {Promise} A promise that resolve with an amqp.node connection object - */ - getConnection() { - const url = this._config.host; - const hostname = this._config.hostname; - let connection = this.connections[url]; - - // cache handling, if connection already opened, return it - if (connection && connection.conn) { - return Promise.resolve(connection.conn); - } - // prepare the connection internal object, and reset channel if connection has been closed - connection = this.connections[url] = { - conn: null, - channel: null - }; - connection.conn = amqp.connect(url, { - clientProperties: { - hostname, - bunnymq: packageVersion, - startedAt: this.startedAt, - connectedAt: new Date().toISOString() - } - }).then((conn) => { - // on connection close, delete connection - conn.on('close', () => { - delete connection.conn; - }); - conn.on('error', this._config.transport.error); - connection.conn = conn; - return conn; - }) - .catch((e) => { - connection.conn = null; - throw e; - }); - return connection.conn; - } - - /** - * Create the channel on the broker, once connection is successfuly opened. - * Since RabbitMQ advise to open one channel by process and node is mono-core, we keep only 1 channel for the whole connection. - * @return {Promise} A promise that resolve with an amqp.node channel object - */ - getChannel() { - const url = this._config.host; - const prefetch = this._config.prefetch; - const connection = this.connections[url]; - - // cache handling, if channel already opened, return it - if (connection && connection.chann) { - return Promise.resolve(connection.chann); - } - - connection.chann = connection.conn.createChannel() - .then((channel) => { - channel.prefetch(prefetch); - - // on error we remove the channel so the next call will recreate it (auto-reconnect are handled by connection users) - channel.on('close', () => { delete connection.chann; }); - channel.on('error', this._config.transport.error); - - connection.chann = channel; - return channel; - }); - return connection.chann; - } - - /** - * Connect to AMQP and create channel - * @return {Promise} A promise that resolve with an amqp.node channel object - */ - get() { - return this.getConnection().then(() => this.getChannel()); - } - - /** - * Register an event on the amqp.node channel - * @param {string} on the channel event name to be binded with - * @param {function} func the callback function to execute when the event is called - */ - addListener(on, func) { - this.get().then((channel) => { - channel.on(on, func); - }); - } - - get config() { - return this._config; - } - - set config(value) { - this._config = value; - } -} - - -let instance; - -module.exports = (config) => { - assert(instance || config, 'Connection can not be created because config does not exist'); - assert(config.hostname); - if (!instance) { - instance = new Connection(config); - } else { - instance.config = config; - } - return instance; -}; diff --git a/src/modules/message-parsers.js b/src/modules/message-parsers.js deleted file mode 100644 index e53a533..0000000 --- a/src/modules/message-parsers.js +++ /dev/null @@ -1,48 +0,0 @@ -const serializeError = require('serialize-error'); -const deserializeError = require('deserialize-error'); -/** - * Incoming message parser - parse message based on headers - * @param {object} msg An amqp.node incoming message - * @return {any} a string, object, number to send. Something stringifiable - */ -module.exports.in = (msg) => { - // if sender put a json header, we parse it to avoid the pain for the consumer - if (msg.content) { - if (msg.properties.contentType === 'application/json') { - const content = JSON.parse(msg.content.toString()); - if (content && content.error && content.error instanceof Object) { - content.error = deserializeError(content.error); - } - return content; - } - - if (msg.content.length) { - return msg.content.toString(); - } - } - - return undefined; -}; - -/** - * Outgoing message parser - add header tags for receiver processing - * @param {any} content a string, object, number to send. Something serializable / bufferable - * @param {object} options amqp.node message options object - * @return {Buffer} node.js Buffer object, sent by amqp.node - */ - /* eslint no-param-reassign: "off" */ -module.exports.out = (content, options) => { - const falsie = [undefined, null]; - if (!falsie.includes(content) && typeof content !== 'string') { - if (content.error instanceof Error) { - content.error = serializeError(content.error); - } - // if content is not a string, we JSONify it (JSON.parse can handle numbers, etc. so we can skip all the checks) - content = JSON.stringify(content); - options.contentType = 'application/json'; - } else if (falsie.includes(content)) { - return Buffer.from([]); - } - - return Buffer.from(content, 'utf-8'); -}; diff --git a/src/modules/utils.js b/src/modules/utils.js deleted file mode 100644 index 69a294a..0000000 --- a/src/modules/utils.js +++ /dev/null @@ -1,23 +0,0 @@ -/** - * A function to generate a pause in promise chaining - * @param {number} timer How much ws to wait - * @return {Promise} A Promise that will resolve when timer is expired - */ -module.exports.timeoutPromise = timer => - new Promise((resolve) => { - setTimeout(resolve, timer); - }); - -function empty() {} - -/** - * Default logger to prevent any printing in the terminal - * @type {Object} - empty logger overwriting the console object methods - */ -module.exports.emptyLogger = { - info: empty, - debug: empty, - warn: empty, - error: empty, - log: empty -}; diff --git a/test/config-spec.js b/test/config-spec.js index e103ec4..b2a56d1 100644 --- a/test/config-spec.js +++ b/test/config-spec.js @@ -1,4 +1,3 @@ -require('dotenv').config({ silent: true }); const assert = require('assert'); const uuid = require('uuid');