diff --git a/.npm/package/npm-shrinkwrap.json b/.npm/package/npm-shrinkwrap.json index 9b02f5ba..e727ff6d 100644 --- a/.npm/package/npm-shrinkwrap.json +++ b/.npm/package/npm-shrinkwrap.json @@ -1,26 +1,6 @@ { "lockfileVersion": 1, "dependencies": { - "assertion-error": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", - "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==" - }, - "chai": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/chai/-/chai-4.2.0.tgz", - "integrity": "sha512-XQU3bhBukrOsQCuwZndwGcCVQHyZi53fQ6Ys1Fym7E4olpIqqZZhhoFJoaKVvV17lWQoXYwgWN2nF5crA8J2jw==" - }, - "check-error": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", - "integrity": "sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=" - }, - "deep-eql": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", - "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==" - }, "deep-extend": { "version": "0.5.0", "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.5.0.tgz", @@ -31,20 +11,10 @@ "resolved": "https://registry.npmjs.org/double-ended-queue/-/double-ended-queue-2.1.0-0.tgz", "integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw=" }, - "get-func-name": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.0.tgz", - "integrity": "sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=" - }, - "lodash.clonedeep": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", - "integrity": "sha1-4j8/nE+Pvd6HJSnBBxhXoIblzO8=" - }, - "pathval": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.0.tgz", - "integrity": "sha1-uULm1L3mUwBe9rcTYd74cn0GReA=" + "lodash": { + "version": "4.17.15", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", + "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==" }, "redis": { "version": "2.8.0", @@ -52,19 +22,14 @@ "integrity": "sha512-M1OkonEQwtRmZv4tEWF2VgpG0JWJ8Fv1PhlgT5+B+uNq2cA3Rt1Yt/ryoR+vQNOQcIEgdCdfH0jr3bDpihAw1A==" }, "redis-commands": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.5.0.tgz", - "integrity": "sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg==" + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.6.0.tgz", + "integrity": "sha512-2jnZ0IkjZxvguITjFTrGiLyzQZcTvaw8DAaCXxZq/dsHXz7KfMQ3OUJy7Tz9vnRtZRVz6VRCPDvruvU8Ts44wQ==" }, "redis-parser": { "version": "2.6.0", "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-2.6.0.tgz", "integrity": "sha1-Uu0J2srBCPGmMcB+m2mUHnoZUEs=" - }, - "type-detect": { - "version": "4.0.8", - "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", - "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==" } } } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a18b2a8b..970b2a70 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -30,7 +30,7 @@ It is also always helpful to have some context for your pull request. What was t ``` meteor create --release 1.8.1 --bare test cd test -meteor npm i --save puppeteer@1.18.1 simpl-schema +meteor npm i --save puppeteer@1.18.1 simpl-schema chai ``` ### Start Tests diff --git a/lib/config.js b/lib/config.js index dbcf4765..7bd00be0 100644 --- a/lib/config.js +++ b/lib/config.js @@ -55,6 +55,12 @@ let Config = { }, }, }, + // Max number of redis payloads we attempt to process before just triggering a requery + maxRedisEventsToProcess: 300, + // Debounce interval after which we flush the queued redis payloads + debounceInterval: 100, + // Maximum wait time for flushing queued redis payloads + maxDebounceWait: 3000 }; export default Config; diff --git a/lib/mongo/lib/dispatchers.js b/lib/mongo/lib/dispatchers.js index f49b1a72..79de10c1 100644 --- a/lib/mongo/lib/dispatchers.js +++ b/lib/mongo/lib/dispatchers.js @@ -1,5 +1,4 @@ import { Meteor } from 'meteor/meteor'; -import { DDPServer } from 'meteor/ddp-server'; import { EJSON } from 'meteor/ejson'; import { Events, RedisPipe } from '../../constants'; import RedisSubscriptionManager from '../../redis/RedisSubscriptionManager'; @@ -8,21 +7,15 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel'; import Config from '../../config'; import OptimisticInvocation from '../OptimisticInvocation'; -const dispatchEvents = function(optimistic, collectionName, channels, events) { +const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) { if (optimistic) { - OptimisticInvocation.withValue(true, () => { + OptimisticInvocation.withValue(true, () => { events.forEach(event => { const docId = event[RedisPipe.DOC]._id; - const dedicatedChannel = getDedicatedChannel( - collectionName, - docId - ); - RedisSubscriptionManager.process(dedicatedChannel, event); - - channels.forEach(channelName => { - RedisSubscriptionManager.process(channelName, event); - }); + const dedicatedChannel = getDedicatedChannel(collectionName, docId); + RedisSubscriptionManager.process(dedicatedChannel, [event]); }); + channels.forEach(channel => RedisSubscriptionManager.process(channel, events)); }); } diff --git a/lib/processors/actions/requery.js b/lib/processors/actions/requery.js index e23b311e..27b77eca 100644 --- a/lib/processors/actions/requery.js +++ b/lib/processors/actions/requery.js @@ -1,15 +1,10 @@ -import { _ } from 'meteor/underscore'; -import { EJSON } from 'meteor/ejson'; -import { Events } from '../../constants'; import { MongoIDMap } from '../../cache/mongoIdMap'; /** * @param observableCollection - * @param newCommer - * @param event - * @param modifiedFields + * @param documentMap */ -export default function (observableCollection, newCommer, event, modifiedFields) { +export default function (observableCollection, documentMap) { const { store, selector, options } = observableCollection; const newStore = new MongoIDMap(); @@ -17,28 +12,27 @@ export default function (observableCollection, newCommer, event, modifiedFields) selector, { ...options, fields: { _id: 1 } }).fetch(); freshIds.forEach(doc => newStore.set(doc._id, doc)); - let added = false; store.compareWith(newStore, { + // Any documents found only on the left store + // should be removed leftOnly(docId) { observableCollection.remove(docId); }, + // Any documents found in both and with documentMap entries + // have received redis updates indicating there are changes + both(docId) { + if (documentMap[docId]) { + observableCollection.change(documentMap[docId]) + } + }, + // Any documents only present in the right store are newly + // added rightOnly(docId) { - if (newCommer && EJSON.equals(docId, newCommer._id)) { - added = true; - observableCollection.add(newCommer); + if (documentMap[docId]) { + observableCollection.add(documentMap[docId]); } else { observableCollection.addById(docId); } } }); - - // if we have an update, and we have a newcommer, that new commer may be inside the ids - // TODO: maybe refactor this in a separate action (?) - if (newCommer - && Events.UPDATE === event - && modifiedFields - && !added - && store.has(newCommer._id)) { - observableCollection.change(newCommer, modifiedFields); - } } diff --git a/lib/processors/default.js b/lib/processors/default.js index 7af80f73..4d9f5889 100644 --- a/lib/processors/default.js +++ b/lib/processors/default.js @@ -1,32 +1,13 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; /** * @param observableCollection - * @param event * @param doc - * @param modifiedFields */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} - -/** - * @param observableCollection - * @param doc - */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if ( !observableCollection.contains(doc._id) && observableCollection.isEligible(doc) @@ -36,30 +17,63 @@ const handleInsert = function(observableCollection, doc) { }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { if (observableCollection.isEligible(doc)) { if (observableCollection.contains(doc._id)) { observableCollection.change(doc, modifiedFields); } else { observableCollection.add(doc); } - } else { - if (observableCollection.contains(doc._id)) { - observableCollection.remove(doc._id); - } + } else if (observableCollection.contains(doc._id)) { + observableCollection.remove(doc._id); } }; /** - * @param observableCollection - * @param doc - */ -const handleRemove = function(observableCollection, doc) { +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { if (observableCollection.contains(doc._id)) { observableCollection.remove(doc._id); } }; + +/** + * @param observableCollection + * @param events + * @param documentMap + */ +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} diff --git a/lib/processors/direct.js b/lib/processors/direct.js index af54d8ac..edc433df 100644 --- a/lib/processors/direct.js +++ b/lib/processors/direct.js @@ -1,32 +1,14 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import Config from '../config'; +import RedisPipe, { Events } from '../constants'; +import requery from './actions/requery'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if ( !observableCollection.contains(doc._id) && observableCollection.isEligible(doc) @@ -36,11 +18,11 @@ const handleInsert = function(observableCollection, doc) { }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { const otherSelectors = observableCollection.__containsOtherSelectorsThanId; if (otherSelectors) { @@ -50,24 +32,56 @@ const handleUpdate = function(observableCollection, doc, modifiedFields) { } else { observableCollection.add(doc); } - } else { - if (observableCollection.contains(doc._id)) { + } else if (observableCollection.contains(doc._id)) { observableCollection.remove(doc._id); - } } + } else if (observableCollection.contains(doc._id)) { + observableCollection.change(doc, modifiedFields); } else { - if (observableCollection.contains(doc._id)) { - observableCollection.change(doc, modifiedFields); - } else { - observableCollection.add(doc); - } + observableCollection.add(doc); } }; /** - * @param observableCollection - * @param doc - */ -const handleRemove = function(observableCollection, doc) { +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { observableCollection.remove(doc._id); }; + + +/** + * @param observableCollection + * @param events + * @param documentMap + */ +export default function(observableCollection, events, documentMap) { + const needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (needsRequery) { + requery(observableCollection, documentMap); + return; + } + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + } +} \ No newline at end of file diff --git a/lib/processors/index.js b/lib/processors/index.js index 34805acc..309d05bb 100644 --- a/lib/processors/index.js +++ b/lib/processors/index.js @@ -14,7 +14,7 @@ const StrategyProcessorMap = { export { getStrategy } /** - * @param strategy + * @param {String} strategy * @returns {*} */ export function getProcessor(strategy) { diff --git a/lib/processors/limit-sort.js b/lib/processors/limit-sort.js index a2e5af50..da90e7fc 100644 --- a/lib/processors/limit-sort.js +++ b/lib/processors/limit-sort.js @@ -1,84 +1,98 @@ -import { Events } from '../constants'; +import { Meteor } from 'meteor/meteor'; +import RedisPipe, { Events } from '../constants'; import { hasSortFields } from './lib/fieldsExist'; import requery from './actions/requery'; +import Config from '../config'; -/** - * @param observableCollection - * @param event - * @param doc - * @param modifiedFields - */ -export default function(observableCollection, event, doc, modifiedFields) { - switch (event) { - case Events.INSERT: - handleInsert(observableCollection, doc); - break; - case Events.UPDATE: - handleUpdate(observableCollection, doc, modifiedFields); - break; - case Events.REMOVE: - handleRemove(observableCollection, doc); - break; - default: - throw new Meteor.Error(`Invalid event specified: ${event}`); - } -} /** * @param observableCollection * @param doc */ -const handleInsert = function(observableCollection, doc) { +const handleInsert = (observableCollection, doc) => { if (observableCollection.isEligible(doc)) { - requery(observableCollection, doc); + return true; } + + return false; }; /** - * @param observableCollection - * @param doc - * @param modifiedFields - */ -const handleUpdate = function(observableCollection, doc, modifiedFields) { +* @param observableCollection +* @param doc +* @param modifiedFields +*/ +const handleUpdate = (observableCollection, doc, modifiedFields) => { if (observableCollection.contains(doc._id)) { if (observableCollection.isEligible(doc)) { if ( hasSortFields(observableCollection.options.sort, modifiedFields) ) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); - } else { - observableCollection.change(doc, modifiedFields); + return true; } + + observableCollection.change(doc, modifiedFields); } else { - requery(observableCollection); - } - } else { - if (observableCollection.isEligible(doc)) { - requery( - observableCollection, - doc, - Events.UPDATE, - modifiedFields - ); + return true; } + } else if (observableCollection.isEligible(doc)) { + return true; } + + return false; +}; + +/** +* @param observableCollection +* @param doc +*/ +const handleRemove = (observableCollection, doc) => { + if (observableCollection.contains(doc._id)) { + return true; + } else if (observableCollection.options.skip) { + return true; + } + + return false; }; /** * @param observableCollection + * @param event * @param doc + * @param modifiedFields */ -const handleRemove = function(observableCollection, doc) { - if (observableCollection.contains(doc._id)) { - requery(observableCollection, doc); - } else { - if (observableCollection.options.skip) { - requery(observableCollection, doc); +export default function(observableCollection, events, documentMap) { + let needsRequery = events.length > Config.maxRedisEventsToProcess; + + if (!needsRequery) { + for (let i = 0; i < events.length; i++) { + const event = events[i]; + const docId = event[RedisPipe.DOC]._id; + const modifiedFields = event[RedisPipe.FIELDS]; + const doc = documentMap[docId]; + + switch (event[RedisPipe.EVENT]) { + case Events.INSERT: + needsRequery = handleInsert(observableCollection, doc); + break; + case Events.UPDATE: + needsRequery = handleUpdate(observableCollection, doc, modifiedFields); + break; + case Events.REMOVE: + needsRequery = handleRemove(observableCollection, doc); + break; + default: + throw new Meteor.Error(`Invalid event specified: ${event}`); + } + + if (needsRequery) { + break; + } } } -}; + + if (needsRequery) { + requery(observableCollection, documentMap); + } +} \ No newline at end of file diff --git a/lib/redis/RedisSubscriber.js b/lib/redis/RedisSubscriber.js index e2791ea7..8e10348f 100644 --- a/lib/redis/RedisSubscriber.js +++ b/lib/redis/RedisSubscriber.js @@ -1,6 +1,5 @@ import { Strategy } from '../constants'; import { getProcessor } from '../processors'; -import { _ } from 'meteor/underscore'; import { Meteor } from 'meteor/meteor'; import extractIdsFromSelector from '../utils/extractIdsFromSelector'; import RedisSubscriptionManager from './RedisSubscriptionManager'; diff --git a/lib/redis/RedisSubscriptionManager.js b/lib/redis/RedisSubscriptionManager.js index 5ced6f27..73373f71 100644 --- a/lib/redis/RedisSubscriptionManager.js +++ b/lib/redis/RedisSubscriptionManager.js @@ -1,6 +1,7 @@ import { Meteor } from 'meteor/meteor'; import { Random } from 'meteor/random'; import { _ } from 'meteor/underscore'; +import debounce from 'lodash/debounce'; import debug from '../debug'; import { RedisPipe, Events } from '../constants'; import getFieldsOfInterestFromAll from './lib/getFieldsOfInterestFromAll'; @@ -23,7 +24,7 @@ class RedisSubscriptionManager { * Returns all RedisSubscribers regardless of channel */ getAllRedisSubscribers() { - let redisSubscribers = []; + const redisSubscribers = []; for (channel in this.store) { this.store[channel].forEach(_redisSubscriber => redisSubscribers.push(_redisSubscriber) @@ -78,12 +79,26 @@ class RedisSubscriptionManager { initializeChannel(channel) { debug(`[RedisSubscriptionManager] Subscribing to channel: ${channel}`); - // create the handler for this channel + let redisEvents = []; const self = this; - const handler = function(message) { - self.queue.queueTask(() => { - self.process(channel, message, true); - }); + + // debounce redis events so that they are processed in bulk + const flushRedisEventsForChannel = debounce( + () => { + const events = redisEvents.slice(); + redisEvents = []; + self.queue.queueTask(() => { + self.process(channel, events, true); + }); + }, + Config.debounceInterval, + { maxWait: Config.maxDebounceWait, trailing: true, leading: false } + ); + + // create the handler for this channel + const handler = (message) => { + redisEvents.push(message); + flushRedisEventsForChannel(); }; this.channelHandlers[channel] = handler; @@ -110,105 +125,110 @@ class RedisSubscriptionManager { /** * @param channel - * @param data - * @param [fromRedis=false] + * @param events + * @param fromRedis */ - process(channel, data, fromRedis) { - // messages from redis that contain our uid were handled - // optimistically, so we can drop them. - if (fromRedis && data[RedisPipe.UID] === this.uid) { - return; - } - + process(channel, events, fromRedis) { const subscribers = this.store[channel]; if (!subscribers) { return; } - let isSynthetic = data[RedisPipe.SYNTHETIC]; + const filteredEvents = []; + const syntheticEvents = []; + events.forEach(event => { + // Ignore any updates that have been processed optimistically + if (fromRedis && event[RedisPipe.UID] === this.uid) return; - debug( - `[RedisSubscriptionManager] Received ${ - isSynthetic ? 'synthetic ' : '' - }event: "${data[RedisPipe.EVENT]}" to "${channel}"` - ); - - if (subscribers.length === 0) { - return; - } - - if (!isSynthetic) { - const collection = subscribers[0].observableCollection.collection; - - let doc; - if (data[RedisPipe.EVENT] === Events.REMOVE) { - doc = data[RedisPipe.DOC]; + const isSynthetic = !!event[RedisPipe.SYNTHETIC]; + if (isSynthetic) { + syntheticEvents.push(event); } else { - doc = this.getDoc(collection, subscribers, data); + filteredEvents.push(event); } + }); - // if by any chance it was deleted after it got dispatched - // doc will be undefined - if (!doc) { - return; - } + // Determine the collection from the first observable collection + const collection = subscribers[0].observableCollection.collection; + const documentMap = this.getDocumentMapForEvents(collection, subscribers, filteredEvents); + // Process filtered events in bulk + if (filteredEvents.length) { subscribers.forEach(redisSubscriber => { try { - redisSubscriber.process( - data[RedisPipe.EVENT], - doc, - data[RedisPipe.FIELDS] - ); + redisSubscriber.process(filteredEvents, documentMap); } catch (e) { debug( `[RedisSubscriptionManager] Exception while processing event: ${e.toString()}` ); } }); - } else { - subscribers.forEach(redisSubscriber => { - try { - redisSubscriber.processSynthetic( - data[RedisPipe.EVENT], - data[RedisPipe.DOC], - data[RedisPipe.MODIFIER], - data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] - ); - } catch (e) { - debug( - `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` - ); - } - }); + } + + // Individually process synthetic events + // TODO: process synthetic events in bulk + if (syntheticEvents.length) { + syntheticEvents.forEach(data => { + subscribers.forEach(redisSubscriber => { + try { + redisSubscriber.processSynthetic( + data[RedisPipe.EVENT], + data[RedisPipe.DOC], + data[RedisPipe.MODIFIER], + data[RedisPipe.MODIFIED_TOP_LEVEL_FIELDS] + ); + } catch (e) { + debug( + `[RedisSubscriptionManager] Exception while processing synthetic event: ${e.toString()}` + ); + } + }); + }) } } /** - * @param collection - * @param subscribers - * @param data + * Build a documentMap for the docIds in the redis events + * @param collection + * @param subscribers + * @param events */ - getDoc(collection, subscribers, data) { - const event = data[RedisPipe.EVENT]; - let doc = data[RedisPipe.DOC]; - - if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { - // If there's no protection against race conditions - // It means we have received the full doc in doc + getDocumentMapForEvents(collection, subscribers, events) { + const documentMap = {}; + const options = {}; - return doc; + // Calculate fields of interest across all subscribers and add the + // appropriate field limiting if necessary + const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + if (fieldsOfInterest !== true) { + options.fields = fieldsOfInterest; } - const fieldsOfInterest = getFieldsOfInterestFromAll(subscribers); + const docIdsToFetch = []; + events.forEach(event => { + const doc = event[RedisPipe.DOC]; + if (collection._redisOplog && !collection._redisOplog.protectAgainstRaceConditions) { + // If there's no protection against race conditions + // It means we have received the full doc in doc + documentMap[doc._id] = doc; + } + // no need to fetch full documents for the remove event + else if (event[RedisPipe.EVENT] === Events.REMOVE) { + documentMap[doc._id] = doc; + } else { + docIdsToFetch.push(doc._id); + } + }); - if (fieldsOfInterest === true) { - doc = collection.findOne(doc._id); - } else { - doc = collection.findOne(doc._id, { fields: fieldsOfInterest }); + // Execute a single bulk fetch for all docIds that need to be fetched and store them in + // the document map + if (docIdsToFetch.length) { + collection.find({ _id: { $in: docIdsToFetch } }, options).forEach(doc => { + documentMap[doc._id] = doc; + }); } - return doc; + return documentMap; } } diff --git a/package.js b/package.js index d447275b..38b79c57 100644 --- a/package.js +++ b/package.js @@ -13,7 +13,7 @@ Package.describe({ Npm.depends({ redis: '2.8.0', 'deep-extend': '0.5.0', - 'lodash.clonedeep': '4.5.0' + 'lodash': '4.17.15' }); Package.onUse(function(api) { @@ -53,7 +53,7 @@ Package.onTest(function(api) { api.use('matb33:collection-hooks@0.8.4'); api.use('alanning:roles@1.2.16'); - api.use(['meteortesting:mocha']); + api.use(['meteortesting:mocha@1.0.0', 'meteortesting:mocha-core@1.0.1']); api.mainModule('testing/main.server.js', 'server'); api.addFiles('testing/publishComposite/boot.js', 'server'); diff --git a/testing/main.client.js b/testing/main.client.js index cbab9d3f..66648add 100644 --- a/testing/main.client.js +++ b/testing/main.client.js @@ -5,6 +5,7 @@ import './synthetic_mutators'; import './client_side_mutators'; import './publishComposite/client.test'; import './optimistic-ui/client.test'; +import './payload_batching.client'; // import './server-autorun/client'; import './transformations/client'; import './publish-counts/client'; diff --git a/testing/payload_batching.client.js b/testing/payload_batching.client.js new file mode 100644 index 00000000..29e366bb --- /dev/null +++ b/testing/payload_batching.client.js @@ -0,0 +1,145 @@ +import { Random } from 'meteor/random'; +import {Collections, config} from './boot'; +import helperGenerator from './lib/helpers'; +import Config from '../lib/config'; + +const Collection = Collections['Standard']; + +describe('Redis Payload Batching', function () { + const { + update, + createSync, + subscribe, + waitForHandleToBeReady + } = helperGenerator(config['Standard'].suffix); + + it('Should process all the updates at once since we are not waiting more than the debounceInterval between updates', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + const expectedChanges = 1; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200); + } + }, + }); + + // kick off several updates + for (let i = 0; i < 10; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + } + }); + + it('Should correctly process each update separately since we are waiting longer than the debounce interval between updates', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + // We wait twice the debounce interval to ensure that any payloads that were received + // and debounced by the server would have been processed + const sleepInterval = 2 * Config.debounceInterval; + // Execute multiple updates to confirm that those updates are not being batched + const numUpdates = 3; + // Since we are sleeping more than the debounce interval we expect our total number + // of changes received from the server to equal the number of updates + const expectedChanges = numUpdates; + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we receive the expected number of change events + if (changes === expectedChanges) { + done(); + } + }, + }); + + // kick off several updates + for (let i = 0; i < numUpdates; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, sleepInterval)); + } + }); + + it('Should correctly use maxWait to batch changes if we exceed the first debounce window', async function(done) { + const docId = Random.id(); + let handle = subscribe({ _id: docId }); + // We set a short sleep interval here because we want to process more than one + // update in the same batch + const sleepInterval = 30; + // We execute 101 updates here so that the total execution time here is 30ms * 101 updates = 3030ms + // This ensures that our final update happens after the maxWait and should be processed in two batches + const numUpdates = 101; + // Since we should see our updates processed in two batches, we expect to receive only two changed events + // from the server + const expectedChanges = 2; + + await createSync({ _id: docId, value: -1 }); + + await waitForHandleToBeReady(handle); + + let changes = 0; + Collection.find({ _id: docId }).observeChanges({ + changed() { + changes += 1; + + // ensure we don't receive more updates than expected + if (changes === expectedChanges) { + setTimeout(() => { + if (changes === expectedChanges) done(); + else throw new Error('Too many changes') + }, 200); + } + }, + }) + + // kick off several updates + for (let i = 0; i < numUpdates; i++) { + update( + { _id: docId }, + { + $set: { + value: i, + }, + }, + { optimistic: false, pushToRedis: true } + ); + // wait till new debounce interval + await new Promise(resolve => setTimeout(resolve, sleepInterval)); + } + }).timeout(5000); +}); \ No newline at end of file