Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 92 additions & 55 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:powersync_core/sqlite3_common.dart';
Expand Down Expand Up @@ -508,23 +509,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
}
final last = all[all.length - 1];
return CrudBatch(
crud: all,
haveMore: haveMore,
complete: ({String? writeCheckpoint}) async {
await writeTransaction((db) async {
await db
.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null &&
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) {
await db.execute(
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
[writeCheckpoint]);
} else {
await db.execute(
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
}
});
});
crud: all,
haveMore: haveMore,
complete: _crudCompletionCallback(last.clientId),
);
}

/// Get the next recorded transaction to upload.
Expand All @@ -538,46 +526,95 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
///
/// Unlike [getCrudBatch], this only returns data from a single transaction at a time.
/// All data for the transaction is loaded into memory.
Future<CrudTransaction?> getNextCrudTransaction() async {
return await readTransaction((tx) async {
final first = await tx.getOptional(
'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
if (first == null) {
return null;
}
final txId = first['tx_id'] as int?;
List<CrudEntry> all;
if (txId == null) {
all = [CrudEntry.fromRow(first)];
} else {
final rows = await tx.getAll(
'SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC',
[txId]);
all = [for (var row in rows) CrudEntry.fromRow(row)];
Future<CrudTransaction?> getNextCrudTransaction() {
return getCrudTransactions().firstOrNull;
}

/// Returns a stream of completed transactions with local writes against the
/// database.
///
/// This is typically used from the [PowerSyncBackendConnector.uploadData]
/// method. Each entry emitted by the stream is a full transaction containing
/// all local writes made while that transaction was active.
///
/// Unlike [getNextCrudTransaction], which always returns the oldest
/// transaction that hasn't been [CrudTransaction.complete]d yet, this stream
/// can be used to receive multiple transactions. Calling
/// [CrudTransaction.complete] will mark that transaction and all prior
/// transactions emitted by the stream as completed.
///
/// This can be used to upload multiple transactions in a single batch, e.g.
/// with:
///
/// ```dart
/// CrudTransaction? lastTransaction;
/// final batch = <CrudEntry>[];
///
/// await for (final transaction in powersync.nextCrudTransactions()) {
/// batch.addAll(transaction.crud);
/// lastTransaction = transaction;
///
/// if (batch.length > 100) {
/// break;
/// }
/// }
///
/// if (batch.isNotEmpty) {
/// await uploadBatch(batch);
/// lastTransaction!.complete();
/// }
/// ```
///
/// If there is no local data to upload, the stream emits a single `onDone`
/// event.
Stream<CrudTransaction> getCrudTransactions() async* {
var lastCrudItemId = -1;
const sql = '''
WITH RECURSIVE crud_entries AS (
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
UNION ALL
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
WHERE crud_entries.tx_id = ps_crud.tx_id
)
SELECT * FROM crud_entries;
''';

while (true) {
final nextTransaction = await getAll(sql, [lastCrudItemId]);
if (nextTransaction.isEmpty) {
break;
}

final last = all[all.length - 1];

return CrudTransaction(
transactionId: txId,
crud: all,
complete: ({String? writeCheckpoint}) async {
await writeTransaction((db) async {
await db.execute(
'DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null &&
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') ==
null) {
await db.execute(
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
[writeCheckpoint]);
} else {
await db.execute(
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
}
});
});
});
final items = [for (var row in nextTransaction) CrudEntry.fromRow(row)];
final last = items.last;
final txId = last.transactionId;

yield CrudTransaction(
crud: items,
complete: _crudCompletionCallback(last.clientId),
transactionId: txId,
);
lastCrudItemId = last.clientId;
}
}

Future<void> Function({String? writeCheckpoint}) _crudCompletionCallback(
int lastClientId) {
return ({String? writeCheckpoint}) async {
await writeTransaction((db) async {
await db.execute('DELETE FROM ps_crud WHERE id <= ?', [lastClientId]);
if (writeCheckpoint != null &&
await db.getOptional('SELECT 1 FROM ps_crud LIMIT 1') == null) {
await db.execute(
'UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name=\'\$local\'',
[writeCheckpoint]);
} else {
await db.execute(
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
}
});
};
}

/// Takes a read lock, without starting a transaction.
Expand Down
33 changes: 33 additions & 0 deletions packages/powersync_core/test/crud_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,39 @@ void main() {
expect(await powersync.getNextCrudTransaction(), equals(null));
});

test('nextCrudTransactions', () async {
Future<void> createTransaction(int size) {
return powersync.writeTransaction((tx) async {
for (var i = 0; i < size; i++) {
await tx.execute('INSERT INTO assets (id) VALUES (uuid())');
}
});
}

await expectLater(powersync.getCrudTransactions(), emitsDone);

await createTransaction(5);
await createTransaction(10);
await createTransaction(15);

CrudTransaction? lastTransaction;
final batch = <CrudEntry>[];
await for (final transaction in powersync.getCrudTransactions()) {
batch.addAll(transaction.crud);
lastTransaction = transaction;

if (batch.length > 10) {
break;
}
}

expect(batch, hasLength(15));
await lastTransaction!.complete();

final remainingTransaction = await powersync.getNextCrudTransaction();
expect(remainingTransaction?.crud, hasLength(15));
});

test('include metadata', () async {
await powersync.updateSchema(Schema([
Table(
Expand Down
Loading