Skip to content

Commit 91d6790

Browse files
Hooked redis into the core, cleaned up tests that depended on old ObservableCollection, made sure reload happens when connection to redis is resumed
1 parent 8d6dd33 commit 91d6790

18 files changed

+53
-689
lines changed

lib/cache/ObservableCollection.js

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { DiffSequence } from 'meteor/diff-sequence';
22
import { _ } from 'meteor/underscore';
33
import { LocalCollection, Minimongo } from 'meteor/minimongo';
4-
import cloneDeep from 'lodash.clonedeep';
54
import fieldProjectionIsExclusion from './lib/fieldProjectionIsExclusion';
65
import getChannels from './lib/getChannels';
76
import extractFieldsFromFilters from './lib/extractFieldsFromFilters';
87
import { MongoIDMap } from './mongoIdMap';
8+
import { EJSON } from 'meteor/ejson';
99
import isRemovedNonExistent from '../utils/isRemovedNonExistent';
1010
import getStrategy from '../processors/getStrategy';
1111

@@ -24,19 +24,20 @@ const { Matcher } = Minimongo;
2424

2525
export default class ObservableCollection {
2626
/**
27-
* @param observer
28-
* @param cursor
29-
* @param config
27+
* Instantiate the collection
28+
* @param {*} param
3029
*/
31-
constructor(multiplexer, matcher, sorter, cursorDescription) {
30+
constructor({ multiplexer, matcher, sorter, cursorDescription }) {
3231
this.multiplexer = multiplexer;
32+
this.matcher = matcher;
3333
this.cursorDescription = cursorDescription;
3434

3535
this.collectionName = this.cursorDescription.collectionName;
3636
this.collection = Mongo.Collection.__getCollectionByName(
3737
cursorDescription.collectionName
3838
);
3939

40+
// Here we apply the logic of changing the cursor based on the collection-level configuration
4041
if (this.collection._redisOplog) {
4142
const { cursor } = this.collection._redisOplog;
4243
if (cursor) {
@@ -102,7 +103,6 @@ export default class ObservableCollection {
102103
}
103104

104105
this.channels = getChannels(this.collectionName, this.options);
105-
this.testDocEligibility = this._createTestDocEligibility();
106106
this.fieldsOfInterest = this._getFieldsOfInterest();
107107
this.__isInitialized = false;
108108

@@ -128,8 +128,8 @@ export default class ObservableCollection {
128128
* @returns {*}
129129
*/
130130
isEligible(doc) {
131-
if (this.testDocEligibility) {
132-
return this.testDocEligibility(doc);
131+
if (this.matcher) {
132+
return this.matcher.documentMatches(doc).result;
133133
}
134134

135135
return true;
@@ -140,7 +140,7 @@ export default class ObservableCollection {
140140
* @returns {boolean}
141141
*/
142142
isEligibleByDB(_id) {
143-
if (this.testDocEligibility) {
143+
if (this.matcher) {
144144
return !!this.collection.findOne(
145145
_.extend({}, this.selector, { _id }),
146146
{ fields: { _id: 1 } }
@@ -191,7 +191,7 @@ export default class ObservableCollection {
191191
* @param safe {Boolean} If this is set to true, it assumes that the object is cleaned
192192
*/
193193
add(doc, safe = false) {
194-
doc = cloneDeep(doc);
194+
doc = EJSON.clone(doc);
195195

196196
if (!safe) {
197197
if (this.fieldsArray) {
@@ -257,7 +257,7 @@ export default class ObservableCollection {
257257
}
258258

259259
let storedDoc = this.store.get(docId);
260-
let oldDoc = cloneDeep(storedDoc);
260+
let oldDoc = EJSON.clone(storedDoc);
261261

262262
LocalCollection._modify(storedDoc, modifier);
263263

@@ -303,42 +303,6 @@ export default class ObservableCollection {
303303
return false;
304304
}
305305

306-
/**
307-
* Used at initialization
308-
*
309-
* Creates the function that checks if the document is valid
310-
*
311-
* @returns {null}
312-
* @private
313-
*/
314-
_createTestDocEligibility() {
315-
const self = this;
316-
317-
if (_.keys(this.selector).length) {
318-
try {
319-
const matcher = new Matcher(this.selector);
320-
321-
return function(object) {
322-
return matcher.documentMatches(object).result;
323-
};
324-
} catch (e) {
325-
// The logic here is that if our matcher is too complex for minimongo
326-
// We put our matching function to query db
327-
if (
328-
e.toString().indexOf('Unrecognized logical operator') >= 0
329-
) {
330-
return function(object) {
331-
return self.isEligibleByDB(object._id);
332-
};
333-
} else {
334-
throw e;
335-
}
336-
}
337-
}
338-
339-
return null;
340-
}
341-
342306
/**
343307
* Used at initialization
344308
*

lib/cache/testing/ObservableCollection.test.js

Lines changed: 0 additions & 86 deletions
This file was deleted.

lib/cache/testing/index.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
import './ObservableCollection.test';
21
import './filterFieldsForFetching.test';

lib/init.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import RedisSubscriptionManager from './redis/RedisSubscriptionManager';
55
import PubSubManager from './redis/PubSubManager';
66
import { getRedisListener } from './redis/getRedisClient';
77
import deepExtend from 'deep-extend';
8+
import reload from './processors/actions/reload';
89

910
let isInitialized = false;
1011

@@ -29,6 +30,11 @@ export default (config = {}) => {
2930
onConnect() {
3031
// this will be executed initially, but since there won't be any observable collections, nothing will happen
3132
// PublicationFactory.reloadAll();
33+
RedisSubscriptionManager.getAllRedisSubscribers().forEach(
34+
redisSubscriber => {
35+
reload(redisSubscriber.observableCollection);
36+
}
37+
);
3238
},
3339
});
3440

lib/mongo/RedisOplogObserveDriver.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ export default class RedisOplogObserveDriver {
2626
);
2727

2828
// TODO send by object
29-
this.observableCollection = new ObservableCollection(
29+
this.observableCollection = new ObservableCollection({
3030
multiplexer,
3131
matcher,
3232
sorter,
33-
cursorDescription
34-
);
33+
cursorDescription,
34+
});
3535

3636
if (strategy === Strategy.DEDICATED_CHANNELS) {
3737
let oc = this.observableCollection;

lib/mongo/extendObserveChanges.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { diff } from 'deep-diff';
2-
import cloneDeep from 'lodash.clonedeep';
31
import { DDP } from 'meteor/ddp';
42
import isRemovedNonExistent from '../utils/isRemovedNonExistent';
53
import { Mongo, MongoInternals } from 'meteor/mongo';

lib/mongo/lib/dispatchers.js

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,8 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel';
88
import Config from '../../config';
99
import OptimisticInvocation from '../OptimisticInvocation';
1010

11-
const getWriteFence = function(optimistic) {
12-
if (optimistic && DDPServer._CurrentWriteFence) {
13-
return DDPServer._CurrentWriteFence.get();
14-
}
15-
return null;
16-
};
17-
18-
const dispatchEvents = function(fence, collectionName, channels, events) {
19-
if (fence) {
11+
const dispatchEvents = function(optimistic, collectionName, channels, events) {
12+
if (optimistic) {
2013
OptimisticInvocation.withValue(true, () => {
2114
events.forEach(event => {
2215
const docId = event[RedisPipe.DOC]._id;
@@ -58,37 +51,40 @@ const dispatchUpdate = function(
5851
docIds,
5952
fields
6053
) {
61-
const fence = getWriteFence(optimistic);
62-
const uid = fence ? RedisSubscriptionManager.uid : null;
54+
const uid = optimistic ? RedisSubscriptionManager.uid : null;
55+
6356
const events = docIds.map(docId => ({
6457
[RedisPipe.EVENT]: Events.UPDATE,
6558
[RedisPipe.FIELDS]: fields,
6659
[RedisPipe.DOC]: { _id: docId },
6760
[RedisPipe.UID]: uid,
6861
}));
69-
dispatchEvents(fence, collectionName, channels, events);
62+
63+
dispatchEvents(optimistic, collectionName, channels, events);
7064
};
7165

7266
const dispatchRemove = function(optimistic, collectionName, channels, docIds) {
73-
const fence = getWriteFence(optimistic);
74-
const uid = fence ? RedisSubscriptionManager.uid : null;
67+
const uid = optimistic ? RedisSubscriptionManager.uid : null;
68+
7569
const events = docIds.map(docId => ({
7670
[RedisPipe.EVENT]: Events.REMOVE,
7771
[RedisPipe.DOC]: { _id: docId },
7872
[RedisPipe.UID]: uid,
7973
}));
80-
dispatchEvents(fence, collectionName, channels, events);
74+
75+
dispatchEvents(optimistic, collectionName, channels, events);
8176
};
8277

8378
const dispatchInsert = function(optimistic, collectionName, channels, docId) {
84-
const fence = getWriteFence(optimistic);
85-
const uid = fence ? RedisSubscriptionManager.uid : null;
79+
const uid = optimistic ? RedisSubscriptionManager.uid : null;
80+
8681
const event = {
8782
[RedisPipe.EVENT]: Events.INSERT,
8883
[RedisPipe.DOC]: { _id: docId },
8984
[RedisPipe.UID]: uid,
9085
};
91-
dispatchEvents(fence, collectionName, channels, [event]);
86+
87+
dispatchEvents(optimistic, collectionName, channels, [event]);
9288
};
9389

9490
export { dispatchInsert, dispatchUpdate, dispatchRemove };

lib/mongo/observeChanges.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export default function(cursorDescription, ordered, callbacks) {
1212
callbacks
1313
);
1414
}
15+
1516
// You may not filter out _id when observing changes, because the id is a core
1617
// part of the observeChanges API.
1718

@@ -64,11 +65,7 @@ export default function(cursorDescription, ordered, callbacks) {
6465
// At a bare minimum, using the oplog requires us to have an oplog, to
6566
// want unordered callbacks, and to not want a callback on the polls
6667
// that won't happen.
67-
return (
68-
self._oplogHandle &&
69-
!ordered &&
70-
!callbacks._testOnlyPollCallback
71-
);
68+
return !ordered && !callbacks._testOnlyPollCallback;
7269
},
7370
function() {
7471
// We need to be able to compile the selector. Fall back to polling for

0 commit comments

Comments
 (0)