Skip to content

Commit 7e0dffc

Browse files
committed
Implements aggregation support with Elasticsearch, cursor.getAggregations(), cursor.getAggregation(name)
1 parent b5e7ae8 commit 7e0dffc

File tree

7 files changed

+159
-12
lines changed

7 files changed

+159
-12
lines changed

packages/easysearch:core/lib/core/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class Index {
4444
permission: () => true,
4545
defaultSearchOptions: {},
4646
countUpdateIntervalMs: 2000,
47+
aggsUpdateIntervalMs: 10000
4748
};
4849
}
4950

packages/easysearch:core/lib/core/search-collection.js

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class SearchCollection {
1717
*
1818
* @constructor
1919
*/
20-
constructor(indexConfiguration, engine) {
20+
constructor(indexConfiguration, engine, mongoCount = true) {
2121
check(indexConfiguration, Object);
2222
check(indexConfiguration.name, Match.OneOf(String, null));
2323

@@ -28,6 +28,7 @@ class SearchCollection {
2828
this._indexConfiguration = indexConfiguration;
2929
this._name = `${indexConfiguration.name}/easySearch`;
3030
this._engine = engine;
31+
this.mongoCount = mongoCount;
3132

3233
if (Meteor.isClient) {
3334
this._collection = new Mongo.Collection(this._name);
@@ -184,20 +185,47 @@ class SearchCollection {
184185
this.added(collectionName, 'searchCount' + definitionString, { count });
185186

186187
let intervalID;
187-
188188
if (collectionScope._indexConfiguration.countUpdateIntervalMs) {
189+
intervalID = Meteor.setInterval(() => {
190+
let newCount;
191+
if (this.mongoCount) {
192+
newCount = cursor.mongoCursor.count();
193+
} else {
194+
newCount = cursor.count && cursor.count() || 0
195+
}
196+
197+
this.changed(
198+
collectionName,
199+
'searchCount' + definitionString,
200+
{ count: newCount }
201+
);
202+
},
203+
collectionScope._indexConfiguration.countUpdateIntervalMs
204+
);
205+
}
206+
207+
const aggs = cursor._aggs;
208+
209+
if (aggs) {
210+
this.added(collectionName, 'aggs' + definitionString, { aggs });
211+
}
212+
213+
let intervalAggsID;
214+
215+
if (aggs && collectionScope._indexConfiguration.aggsUpdateIntervalMs) {
189216
intervalID = Meteor.setInterval(
190217
() => this.changed(
191218
collectionName,
192-
'searchCount' + definitionString,
193-
{ count: cursor.mongoCursor.count() }
219+
'aggs' + definitionString,
220+
{ aggs }
194221
),
195-
collectionScope._indexConfiguration.countUpdateIntervalMs
222+
collectionScope._indexConfiguration.aggsUpdateIntervalMs
196223
);
197224
}
198225

199226
this.onStop(function () {
200227
intervalID && Meteor.clearInterval(intervalID);
228+
intervalAggsID && Meteor.clearInterval(intervalAggsID);
201229
resultsHandle && resultsHandle.stop();
202230
});
203231

packages/easysearch:core/lib/main.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import Index from './core/index';
22
import Engine from './core/engine';
33
import ReactiveEngine from './core/reactive-engine';
4+
import SearchCollection from './core/search-collection';
45
import Cursor from './core/cursor';
56
import MongoDBEngine from './engines/mongo-db';
67
import MinimongoEngine from './engines/minimongo';
@@ -13,5 +14,6 @@ export {
1314
Cursor,
1415
MongoDBEngine,
1516
MinimongoEngine,
16-
MongoTextIndexEngine
17+
MongoTextIndexEngine,
18+
SearchCollection
1719
};
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { Cursor } from 'meteor/easysearch:core';
2+
3+
/**
4+
* A Cursor that extends the regular EasySearch cursor. This cursor is Elasticsearch specific.
5+
*
6+
* @type {ESCursor}
7+
*/
8+
class ESCursor extends Cursor {
9+
/**
10+
* Constructor
11+
*
12+
* @param {Mongo.Cursor} hitsCursor Referenced mongo cursor to the regular hits field
13+
* @param {Number} count Count of all documents found in regular hits field
14+
* @param {Object} aggs Raw aggragtion data
15+
* @param {Boolean} isReady Cursor is ready
16+
* @param {Object} publishHandle Publish handle to stop if on client
17+
*
18+
* @constructor
19+
*
20+
*/
21+
constructor(cursor, count, isReady = true, publishHandle = null, aggs = {}) {
22+
check(cursor.fetch, Function);
23+
check(count, Number);
24+
check(aggs, Match.Optional(Object));
25+
26+
super(cursor, count, isReady, publishHandle);
27+
28+
this._aggs = aggs;
29+
}
30+
31+
getAggregation(path) {
32+
return this._aggs[path];
33+
}
34+
35+
getAggregations() {
36+
return this._aggs;
37+
}
38+
}
39+
40+
export default ESCursor;

packages/easysearch:elasticsearch/lib/engine.js

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import ElasticSearchDataSyncer from './data-syncer'
2+
import ESCursor from './cursor'
3+
import ESSearchCollection from './search-collection'
24

35
if (Meteor.isServer) {
46
var Future = Npm.require('fibers/future'),
@@ -125,7 +127,12 @@ if (Meteor.isServer) {
125127
* @param {Object} indexConfig Index configuration
126128
*/
127129
onIndexCreate(indexConfig) {
128-
super.onIndexCreate(indexConfig);
130+
if (!indexConfig.allowedFields) {
131+
indexConfig.allowedFields = indexConfig.fields;
132+
}
133+
134+
indexConfig.searchCollection = new ESSearchCollection(indexConfig, this);
135+
indexConfig.mongoCollection = indexConfig.searchCollection._collection;
129136

130137
if (Meteor.isServer) {
131138
indexConfig.elasticSearchClient = new elasticsearch.Client(this.config.client);
@@ -170,7 +177,7 @@ if (Meteor.isServer) {
170177
return;
171178
}
172179

173-
let { total, ids } = this.getCursorData(data),
180+
let { total, ids, aggs } = this.getCursorData(data),
174181
cursor;
175182

176183
if (ids.length > 0) {
@@ -180,10 +187,10 @@ if (Meteor.isServer) {
180187
})
181188
}, { limit: options.search.limit });
182189
} else {
183-
cursor = EasySearch.Cursor.emptyCursor;
190+
cursor = ESCursor.emptyCursor;
184191
}
185192

186-
fut['return'](new EasySearch.Cursor(cursor, total));
193+
fut['return'](new ESCursor(cursor, total, true, null, aggs));
187194
}));
188195

189196
return fut.wait();
@@ -198,8 +205,9 @@ if (Meteor.isServer) {
198205
*/
199206
getCursorData(data) {
200207
return {
201-
ids : _.map(data.hits.hits, (resultSet) => resultSet._id),
202-
total: data.hits.total
208+
ids: _.map(data.hits.hits, (resultSet) => resultSet._id),
209+
total: data.hits.total,
210+
aggs: data.aggregations || {}
203211
};
204212
}
205213
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { SearchCollection } from 'meteor/easysearch:core';
2+
import ESCursor from './cursor';
3+
4+
/**
5+
* A search collection represents a reactive collection on the client,
6+
* which is used by the ReactiveEngine for searching using Elasticsearch.
7+
*
8+
* @type {ESSearchCollection}
9+
*/
10+
class ESSearchCollection extends SearchCollection {
11+
/**
12+
* Constructor
13+
*
14+
* @param {Object} indexConfiguration Index configuration
15+
* @param {ReactiveEngine} engine Reactive Engine
16+
*
17+
* @constructor
18+
*/
19+
constructor() {
20+
super(...[].push.call(arguments, false));
21+
}
22+
23+
/**
24+
* Find documents on the client.
25+
*
26+
* @param {Object} searchDefinition Search definition
27+
* @param {Object} options Options
28+
*
29+
* @returns {ESCursor}
30+
*/
31+
find(searchDefinition, options) {
32+
if (!Meteor.isClient) {
33+
throw new Error('find can only be used on client');
34+
}
35+
36+
let publishHandle = Meteor.subscribe(this.name, searchDefinition, options);
37+
38+
let count = this._getCount(searchDefinition);
39+
let aggs = this._getAggregation(searchDefinition);
40+
let mongoCursor = this._getMongoCursor(searchDefinition, options);
41+
42+
if (!_.isNumber(count)) {
43+
return new ESCursor(mongoCursor, 0, false, null, aggs);
44+
}
45+
46+
return new ESCursor(mongoCursor, count, true, publishHandle, aggs);
47+
}
48+
49+
/**
50+
* Get the aggregations linked to the search
51+
*
52+
* @params {Object} searchDefinition Search definition
53+
*
54+
* @private
55+
*/
56+
_getAggregation(searchDefinition) {
57+
const aggsDoc = this._collection.findOne('aggs' + JSON.stringify(searchDefinition));
58+
if (aggsDoc) {
59+
return aggsDoc.aggs;
60+
}
61+
return {};
62+
}
63+
64+
}
65+
66+
export default ESSearchCollection;

packages/easysearch:elasticsearch/package.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ Package.onUse(function(api) {
2020
api.addFiles([
2121
'lib/data-syncer.js',
2222
'lib/engine.js',
23+
'lib/cursor.js',
24+
'lib/search-collection.js'
2325
]);
2426

2527
api.export('EasySearch');

0 commit comments

Comments
 (0)