Skip to content

Commit b1add5a

Browse files
rkistnerCopilot
andauthored
[MongoDB Storage] Compact parameter lookups (#315)
* Refactor getParameterSets to take a ReplicationCheckpoint. * Move getParameterSets to ReplicationCheckpoint. * Refactor internal MongoSyncBucketStorage checkpoint lookup. * Query parameters at the checkpoint's clusterTime. * Add parameter compact action. * Test consistency of compact. * Fix snapshot implementation. * Add maxTimeMS on parameter queries. * Keep track of last 10k seen keys. * Test that compacting actually has an effect. * Add changeset. * Add docs. * Remove deleted parameter lookup values. * More tests and fixes. * Remove redundant check Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 4a34a51 commit b1add5a

File tree

15 files changed

+836
-334
lines changed

15 files changed

+836
-334
lines changed

.changeset/angry-mice-hide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-core': minor
6+
'@powersync/service-image': minor
7+
---
8+
9+
[MongoDB Storage] Compact action now also compacts parameter lookup storage.

docs/parameters-lookups.md

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Parameter Lookup Implementation
2+
3+
Most of the other docs focus on bucket data, but parameter lookup data also contains some tricky bits.
4+
5+
## Basic requirements
6+
7+
The essence of what we do when syncing data is:
8+
9+
1. Get the latest checkpoint.
10+
2. Evaluate all parameter queries _at the state of the checkpoint_.
11+
3. Return bucket data for the checkpoint.
12+
13+
This doc focuses on point 2.
14+
15+
## Current lookup implementation
16+
17+
We effectively store an "index" for exact lookups on parameter query tables.
18+
19+
The format is in MongoDB storage is:
20+
21+
_id: OpId # auto-incrementing op-id, using the same sequence as checkpoints
22+
key: {g: <sync rules group id>, t: <table id>, k: RowReplicationId } # uniquely identifies the source row
23+
lookup: doc # lookup entry for this source row
24+
bucket_parameters: data # results returned to the parameter query
25+
26+
If one row evaluates to multiple lookups, those are each stored as a separate document with the same key.
27+
28+
When a row is deleted, we empty `bucket_parameters` for the same (key, lookup) combinations.
29+
30+
To query, we do:
31+
32+
1. Filter by sync rules version: key.g.
33+
2. Filter by lookup.
34+
3. Filter by checkpoint: \_id <= checkpoint.
35+
4. Return the last parameter data for each (key, lookup) combination (highest \_id)
36+
37+
## Compacting
38+
39+
In many cases, parameter query tables are updated infrequently, and compacting is not important. However, there are cases where parameter query tables are updated regularly in cron jobs (for example), and the resulting indefinite storage increase causes significant query overhead and other issues.
40+
41+
To handle this, we compact older data. For each (key.g, key, lookup) combination, we only need to keep the last copy (highest \_id). And if the last one is a remove operation (empty parameter_data), we can remove it completely.
42+
43+
One big consideration is sync clients may still need some of that data. To cover for this, parameter lookup queries should specifically use a _snapshot_ query mode, querying at the same snapshot that was used for the checkpoint lookup. This is different from the "Future Options: Snapshot queries" point above: We're not using a snapshot at the time the checkpoint was created, but rather a snapshot at the time the checkpoint was read. This means we always use a fresh snapshot.
44+
45+
# Alternatives
46+
47+
## Future option: Incremental compacting
48+
49+
Right now, compacting scans through the entire collection to compact data. It should be possible to make this more incremental, only scanning through documents added since the last compact.
50+
51+
## Future Option: Snapshot queries
52+
53+
If we could do a snapshot query with a snapshot matching the checkpoint, the rest of the implementation could become quite simple. We could "just" replicate the latest copy of parameter tables, and run arbitrary parameter queries on them.
54+
55+
Unforunately, running snapshot queries for specific checkpoints are not that simple. Tricky parts include associating a snapshot with a specific checkpoint, and snapshots typically expiring after a short duration. Nonetheless, this remains an option to consider in the future.
56+
57+
To implement this with MongoDB:
58+
59+
1. Every time we `commit()` in the replication process, store the current clusterTime (we can use `$$CLUSTER_TIME` for this).
60+
2. When we query for data, use that clustertime.
61+
3. _Make sure we commit at least once every 5 minutes_, ideally every minute.
62+
63+
The last point means that replication issues could also turn into query issues:
64+
65+
1. Replication process being down for 5 minutes means queries stop working.
66+
2. Being more than 5 minutes behind in replication is not an issue, as long as we keep doing new commits.
67+
3. Taking longer than 5 minutes to complete replication for a _single transaction_ will cause API failures. This includes operations such as adding or removing tables.
68+
69+
In theory, we could take this even further to run query parameter queries directly on the _source_ database, without replicating.
70+
71+
## Compacting - Implementation alternatives
72+
73+
Instead of snapshot queries, some other alternatives are listed below. These are not used, just listed here in case we ever need to re-evaluate the implementation.
74+
75+
### 1. Last active checkpoint
76+
77+
Compute a "last active" checkpoint - a checkpoint that started being active at least 5 minutes ago, meaning that we can cleanup data only used for checkpoints older than that.
78+
79+
The issues here are:
80+
81+
1. We don't store older checkpoints, so it can be tricky to find an older checkpoint without waiting 5 minutes.
82+
2. It is difficult to build in hard guarantees for parameter queries here, without relying on time-based heuristics.
83+
3. Keep track of checkpoints used in the API service can be quite tricky.
84+
85+
### 2. Merge / invalidate lookups
86+
87+
Instead of deleting older parameter lookup records, we can merge them.
88+
89+
Say we have two records with the same key and lookup, and \_id of A and B (A < B). The above approach would just delete A, if A < lastActiveCheckpoint.
90+
91+
What we can do instead is merge into:
92+
93+
_id: A
94+
parameter_data: B.parameter_data
95+
not_valid_before: B
96+
97+
The key here is the `not_valid_before` field: When we query for parameter data, we filter by \_id as usual. But if `checkpoint < not_valid_before`, we need to discard that checkpoint.
98+
99+
Now we still need to try to avoid merging recent parameter lookup records, otherwise we may keep on invalidating checkpoints as fast as we generate them. But this could function as a final safety check,
100+
giving us proper consistency guarantees.
101+
102+
This roughly matches the design of `target_op` in MOVE operations.
103+
104+
This still does not cover deleted data: With this approach alone, we can never fully remove records after the source row was deleted, since we need that `not_valid_before` field. So this is not a complete solution.
105+
106+
### 3. Globally invalidate checkpoints
107+
108+
Another alternative is to globally invalidate checkpoints when compacting. So:
109+
110+
1. We pick a `lastActiveCheckpoint`.
111+
2. Persist `noCheckpointBefore: lastActiveCheckpoint` in the sync_rules collection.
112+
3. At some point between doing the parameter lookups and sending a `checkpoint_complete` message, we lookup the `noCheckpointBefore` checkpoint, and invalidate the checkpoint if required.
113+
114+
This allows us to cleanly delete older checkpoints, at the expense of needing to run another query.
115+
116+
This could also replace the current logic we have for `target_op` in MOVE operations.
117+
118+
To do the lookup very efficiently, we can apply some workarounds:
119+
120+
1. For each parameter query (and data query?), store the clusterTime of the results.
121+
2. Right before sending checkpointComplete, query for the noCheckpointBefore value, using `afterClusterTime`.
122+
3. _We can cache those results_, re-using it for other clients. As long as the `afterClusterTime` condition is satisfied, we can use the cached value.
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { logger } from '@powersync/lib-services-framework';
2+
import { bson, CompactOptions, InternalOpId } from '@powersync/service-core';
3+
import { LRUCache } from 'lru-cache';
4+
import { PowerSyncMongo } from './db.js';
5+
import { mongo } from '@powersync/lib-service-mongodb';
6+
import { BucketParameterDocument } from './models.js';
7+
8+
/**
9+
* Compacts parameter lookup data (the bucket_parameters collection).
10+
*
11+
* This scans through the entire collection to find data to compact.
12+
*
13+
* For background, see the `/docs/parameters-lookups.md` file.
14+
*/
15+
export class MongoParameterCompactor {
16+
constructor(
17+
private db: PowerSyncMongo,
18+
private group_id: number,
19+
private checkpoint: InternalOpId,
20+
private options: CompactOptions
21+
) {}
22+
23+
async compact() {
24+
logger.info(`Compacting parameters for group ${this.group_id} up to checkpoint ${this.checkpoint}`);
25+
// This is the currently-active checkpoint.
26+
// We do not remove any data that may be used by this checkpoint.
27+
// snapshot queries ensure that if any clients are still using older checkpoints, they would
28+
// not be affected by this compaction.
29+
const checkpoint = this.checkpoint;
30+
31+
// Index on {'key.g': 1, lookup: 1, _id: 1}
32+
// In theory, we could let MongoDB do more of the work here, by grouping by (key, lookup)
33+
// in MongoDB already. However, that risks running into cases where MongoDB needs to process
34+
// very large amounts of data before returning results, which could lead to timeouts.
35+
const cursor = this.db.bucket_parameters.find(
36+
{
37+
'key.g': this.group_id
38+
},
39+
{
40+
sort: { lookup: 1, _id: 1 },
41+
batchSize: 10_000,
42+
projection: { _id: 1, key: 1, lookup: 1, bucket_parameters: 1 }
43+
}
44+
);
45+
46+
// The index doesn't cover sorting by key, so we keep our own cache of the last seen key.
47+
let lastByKey = new LRUCache<string, InternalOpId>({
48+
max: this.options.compactParameterCacheLimit ?? 10_000
49+
});
50+
let removeIds: InternalOpId[] = [];
51+
let removeDeleted: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
52+
53+
const flush = async (force: boolean) => {
54+
if (removeIds.length >= 1000 || (force && removeIds.length > 0)) {
55+
const results = await this.db.bucket_parameters.deleteMany({ _id: { $in: removeIds } });
56+
logger.info(`Removed ${results.deletedCount} (${removeIds.length}) superseded parameter entries`);
57+
removeIds = [];
58+
}
59+
60+
if (removeDeleted.length > 10 || (force && removeDeleted.length > 0)) {
61+
const results = await this.db.bucket_parameters.bulkWrite(removeDeleted);
62+
logger.info(`Removed ${results.deletedCount} (${removeDeleted.length}) deleted parameter entries`);
63+
removeDeleted = [];
64+
}
65+
};
66+
67+
while (await cursor.hasNext()) {
68+
const batch = cursor.readBufferedDocuments();
69+
for (let doc of batch) {
70+
if (doc._id >= checkpoint) {
71+
continue;
72+
}
73+
const uniqueKey = (
74+
bson.serialize({
75+
k: doc.key,
76+
l: doc.lookup
77+
}) as Buffer
78+
).toString('base64');
79+
const previous = lastByKey.get(uniqueKey);
80+
if (previous != null && previous < doc._id) {
81+
// We have a newer entry for the same key, so we can remove the old one.
82+
removeIds.push(previous);
83+
}
84+
lastByKey.set(uniqueKey, doc._id);
85+
86+
if (doc.bucket_parameters?.length == 0) {
87+
// This is a delete operation, so we can remove it completely.
88+
// For this we cannot remove the operation itself only: There is a possibility that
89+
// there is still an earlier operation with the same key and lookup, that we don't have
90+
// in the cache due to cache size limits. So we need to explicitly remove all earlier operations.
91+
removeDeleted.push({
92+
deleteMany: {
93+
filter: { 'key.g': doc.key.g, lookup: doc.lookup, _id: { $lte: doc._id }, key: doc.key }
94+
}
95+
});
96+
}
97+
}
98+
99+
await flush(false);
100+
}
101+
102+
await flush(true);
103+
logger.info('Parameter compaction completed');
104+
}
105+
}

0 commit comments

Comments
 (0)