Skip to content

New features and enhancements #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
indent_style = space
indent_size = 2

[Makefile]
indent_style = tab

[*.md]
indent_size = 4
trim_trailing_whitespace = false
3 changes: 0 additions & 3 deletions .env.tpl

This file was deleted.

6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ node_modules

# generated doc output
docs/
<<<<<<< HEAD
### VisualStudio template
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
Expand Down Expand Up @@ -416,3 +417,8 @@ com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
=======

# To do
TO-DO.md
>>>>>>> New features and enehancement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an unsuccessful merge

45 changes: 10 additions & 35 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,39 +1,14 @@
# Logs
logs
*.log

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
# test
test
coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules
.env

# documentation
docs
samples
test
# tools
sonar-project.properties
.eslintrc
.eslintignore
.gitignore
.jshintignore
.npmignore
CONTRIBUTING.md
Makefile
sonar-project.properties

#editor settings
.idea
.editorconfig
58 changes: 58 additions & 0 deletions examples/benchmarks/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const Benchmark = require('benchmark');
const { producer, consumer, connection } = require('../../index')({
hostname: process.env.HOSTNAME || 'consumer',
poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 5000,
timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000,
rpcTimeout: parseInt(process.env.AMQP_RPC_TIMEOUT, 10) || 2000,
host: process.env.AMQP_URL || 'amqp://localhost'
});

const LOOP_INTERVAL = parseInt(process.env.LOOP_INTERVAL, 10) || 10;
const MAX_MESSAGES = parseInt(process.env.MAX_MESSAGES, 10) || 100;
const suite = new Benchmark.Suite;

process.on('unhandledRejection', error => {
// Will print "unhandledRejection err is not dewfined"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in the last word

console.error('unhandledRejection', error);
});

connection.connect()
.then(() => {
consumer.consume('test:queue', (msg) => {
// console.log(msg);
return Promise.resolve(true);
})
.then(() => {
// add tests
suite
.add('with RPC', {
defer: true,
repeat: 10,
fn: function (deferred) {
// avoid test inlining
producer.produce('test:queue', { message: 'message', duration: 1000 }, { rpc: true })
.then(() => deferred.resolve())
.catch(() => deferred.resolve());
}
})
.add('without RPC', {
defer: true,
repeat: 10,
fn: function (deferred) {
// avoid test inlining
producer.produce('test:queue', { message: 'message', duration: 1000 }, { rpc: false })
.then(() => deferred.resolve())
.catch(() => deferred.resolve());
}
})
// // add listeners
.on('cycle', function (event) {
console.log(String(event.target));
})
.on('complete', function () {
console.log('Fastest is ' + this.filter('fastest').map('name'));
})
// run async
.run({ 'async': true });
});
});
11 changes: 11 additions & 0 deletions examples/stress-test/consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM dialonce/nodejs:latest

WORKDIR /usr/src/app

COPY . /usr/src/app

RUN npm i --production

EXPOSE 3000

CMD [ "node", "./examples/stress-test/consumer/consumer.js" ]
15 changes: 15 additions & 0 deletions examples/stress-test/consumer/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const { consumer, producer, connection } = require('../../../index')({
hostname: process.env.HOSTNAME || 'consumer',
poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 100,
timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000,
host: process.env.AMQP_URL || 'amqp://localhost',
prefetch: 100,
enableCompression: true
});

connection.connect()
.then(() => {
consumer.consume('queue:heartbeat', (msg) => Promise.resolve(true));
consumer.consume('queue:check-permissions', (msg) => Promise.resolve(true));
consumer.consume('queue:update-paths', (msg) => Promise.resolve(true));
});
43 changes: 43 additions & 0 deletions examples/stress-test/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
version: '3'

services:
rabbitmq_server:
image: rabbitmq:3.6-management-alpine
ports:
- 15672:15672
- 5672:5672

consumer_service:
build:
context: ../../
dockerfile: ./examples/stress-test/consumer/Dockerfile
environment:
- AMQP_URL=amqp://rabbitmq_server
- AMQP_DEBUG=true
- AMQP_TIMEOUT=1000
- AMQP_POOL_SIZE=5
- AMQP_PREFETCH=100
- DEBUG=*
links:
- rabbitmq_server
depends_on:
- rabbitmq_server

producer_service:
build:
context: ../../
dockerfile: ./examples/stress-test/producer/Dockerfile
environment:
- AMQP_URL=amqp://rabbitmq_server
- AMQP_DEBUG=true
- AMQP_TIMEOUT=1000
- AMQP_POOL_SIZE=5000
- AMQP_RPC_TIMEOUT=15000
- LOOP_INTERVAL=1
- MAX_MESSAGES=1
- DEBUG=*
links:
- rabbitmq_server
depends_on:
- rabbitmq_server
- consumer_service
11 changes: 11 additions & 0 deletions examples/stress-test/producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM dialonce/nodejs:latest

WORKDIR /usr/src/app

COPY . /usr/src/app

RUN npm i --production

EXPOSE 3000

CMD [ "node", "./examples/stress-test/producer/producer.js" ]
23 changes: 23 additions & 0 deletions examples/stress-test/producer/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const { producer, connection } = require('../../../index')({
hostname: process.env.HOSTNAME || 'consumer',
poolSize: parseInt(process.env.AMQP_POOL_SIZE, 10) || 5000,
timeout: parseInt(process.env.AMQP_TIMEOUT, 10) || 1000,
rpcTimeout: parseInt(process.env.AMQP_RPC_TIMEOUT, 10) || 15000,
host: process.env.AMQP_URL || 'amqp://localhost',
enableCompression: true
});

const LOOP_INTERVAL = parseInt(process.env.LOOP_INTERVAL, 10) || 1000;
const MAX_MESSAGES = parseInt(process.env.MAX_MESSAGES, 10) || 100;

connection.connect()
.then(() => {
setInterval(() => {
for (let i = 0; i < MAX_MESSAGES; ++i) {
producer.produce('queue:heartbeat', { message: `message`, duration: 1000 }, { rpc: true })
.then((res1) => producer.produce('queue:check-permissions', res1, { rpc: true }))
.then((res2) => producer.produce('queue:update-paths', res2, { rpc: true }))
.catch(console.error);
}
}, LOOP_INTERVAL);
});
41 changes: 41 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require('events').EventEmitter.prototype._maxListeners = process.env.MAX_EMITTERS || 20;

const Connection = require('./lib/connection');
const Consumer = require('./lib/consumer');
const Producer = require('./lib/producer');
const uuid = require('uuid');
const debug = require('debug');

let connection;

module.exports = (config = {}) => {
const configuration = Object.assign({
host: 'amqp://localhost',
// number of fetched messages, at once
prefetch: 5,
// requeue put back message into the broker if consumer crashes/trigger exception
requeue: true,
// requeue count, after this number of retry the message will be rejected
requeueCount: 5,
// time between two reconnect (ms)
timeout: 1000,
// default timeout for RPC calls. If set to '0' there will be none.
rpcTimeout: 15000,
consumerSuffix: '',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be taken from LOCAL_QUEUE?

// generate a hostname so we can track this connection on the broker (rabbitmq management plugin)
hostname: process.env.HOSTNAME || process.env.USER || uuid.v4(),
enableCompression: process.env.AMQP_ENABLE_COMPRESSION === 'true'
// the transport to use to debug. if provided, bunnymq will show some logs
// transport: utils.emptyLogger
}, config);

if (!connection) {
connection = new Connection(configuration, debug('[BunnyMq:connection]'));
}

return {
consumer: new Consumer(connection, configuration, debug('[BunnyMq:consumer]')),
producer: new Producer(connection, configuration, debug('[BunnyMq:producer]')),
connection,
};
};
64 changes: 64 additions & 0 deletions lib/channel-pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
const utils = require('./modules/utils');
const Mutex = require('./modules/mutex');

let instance;
const mutex = new Mutex();

const MAX_POOL_SIZE = 20000;

class channelPool {
constructor(poolSize, connection) {
if (!instance) {
instance = this;

this.connection = connection;
this.poolSize = poolSize;
this.pool = [];
this.channelCount = 0;
// methods binding
this.create = this.create.bind(this);
this.addChannel = this.addChannel.bind(this);
this.getChannel = this.getChannel.bind(this);
this.releaseChannel = this.releaseChannel.bind(this);
}

return instance;
}

create() {
return Promise.all(
Array.from({ length: this.poolSize }).map(() => {
return this.addChannel();
})
);
}

addChannel() {
return this.connection.createChannel()
.then((channel) => {
++this.channelCount;
this.pool.push(channel);
Copy link
Contributor

@Spring3 Spring3 Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do
this.channelCount = this.pool.push(channel)
Take a look at a return value description

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also check for poolSize and current amount of channel?


return channel;
});
}

releaseChannel(channel) {
if (this.pool.length < this.poolSize) {
this.pool.push(channel);
}
}

getChannel() {
return mutex.synchronize(() => {
if (this.pool.length) return Promise.resolve(this.pool.shift());
Copy link
Contributor

@Spring3 Spring3 Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of optimization perspective, can you use pop() instead? The difference is that shift will force the array re-indexing, which is a costly operation if you take into consideration the possible amount of channels and frequency of the function calls, while pop() simply cuts one element from the end.


if (this.channelCount < MAX_POOL_SIZE) return this.addChannel(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addChannel does not have any parameters


return utils.timeoutPromise(Math.floor(Math.random() * 10) + 1)
.then(this.getChannel);
});
}
}

module.exports = channelPool;
Loading