Skip to content

Commit 9ced54d

Browse files
authored
Merge pull request #599 from powersync-ja/rust-core-sync
Add sync implementation from core extension
2 parents 7ad0c1c + 3e5394d commit 9ced54d

File tree

14 files changed

+833
-362
lines changed

14 files changed

+833
-362
lines changed

.changeset/heavy-turkeys-end.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/node': minor
4+
'@powersync/web': minor
5+
'@powersync/react-native': minor
6+
---
7+
8+
This adds a new (and currently experimental) sync client implementation
9+
implemented natively in the PowerSync SQLite extension.
10+
11+
This implementation will eventually become the default, but we encourage
12+
interested users to try it out. In particular, we expect that it can improve
13+
sync performance (especially on platforms with challenging JS performance,
14+
like React Native).
15+
16+
On all our JavaScript SDKs, the new implementation can be enabled with a
17+
sync option entry when connecting:
18+
19+
```JS
20+
await db.connect(new MyConnector(), {
21+
clientImplementation: SyncClientImplementation.RUST
22+
});
23+
```
24+
25+
Since the new client implements the same protocol, you can also migrate back
26+
to the JavaScript client later by removing the `clientImplementation` option.
27+
28+
__However__: After enabling the `RUST` client, you cannot downgrade your
29+
PowerSync SDK below this version. When enabled for the first time, databases
30+
will be migrated. The JavaScript sync client from this and later SDK versions
31+
understands the new format, but the client from an older SDK version will not!

demos/example-node/src/main.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { once } from 'node:events';
22
import repl_factory from 'node:repl';
33

4-
import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
4+
import {
5+
createBaseLogger,
6+
createLogger,
7+
PowerSyncDatabase,
8+
SyncClientImplementation,
9+
SyncStreamConnectionMethod
10+
} from '@powersync/node';
511
import { exit } from 'node:process';
612
import { AppSchema, DemoConnector } from './powersync.js';
713
import { enableUncidiDiagnostics } from './UndiciDiagnostics.js';
@@ -34,7 +40,8 @@ const main = async () => {
3440
console.log(await db.get('SELECT powersync_rs_version();'));
3541

3642
await db.connect(new DemoConnector(), {
37-
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET
43+
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
44+
clientImplementation: SyncClientImplementation.RUST
3845
});
3946
// Example using a proxy agent for more control over the connection:
4047
// const proxyAgent = new (await import('undici')).ProxyAgent({

packages/common/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
"async-mutex": "^0.4.0",
5252
"bson": "^6.6.0",
5353
"buffer": "^6.0.3",
54-
"can-ndjson-stream": "^1.0.2",
5554
"cross-fetch": "^4.0.0",
5655
"event-iterator": "^2.0.0",
5756
"rollup": "4.14.3",

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,31 @@ export enum PSInternalTable {
5959
UNTYPED = 'ps_untyped'
6060
}
6161

62+
export enum PowerSyncControlCommand {
63+
PROCESS_TEXT_LINE = 'line_text',
64+
PROCESS_BSON_LINE = 'line_binary',
65+
STOP = 'stop',
66+
START = 'start',
67+
NOTIFY_TOKEN_REFRESHED = 'refreshed_token',
68+
NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload'
69+
}
70+
6271
export interface BucketStorageListener extends BaseListener {
6372
crudUpdate: () => void;
6473
}
6574

6675
export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
6776
init(): Promise<void>;
68-
saveSyncData(batch: SyncDataBatch): Promise<void>;
77+
saveSyncData(batch: SyncDataBatch, fixedKeyFormat?: boolean): Promise<void>;
6978
removeBuckets(buckets: string[]): Promise<void>;
7079
setTargetCheckpoint(checkpoint: Checkpoint): Promise<void>;
7180

7281
startSession(): void;
7382

7483
getBucketStates(): Promise<BucketState[]>;
7584
getBucketOperationProgress(): Promise<BucketOperationProgress>;
85+
hasMigratedSubkeys(): Promise<boolean>;
86+
migrateToFixedSubkeys(): Promise<void>;
7687

7788
syncLocalDatabase(
7889
checkpoint: Checkpoint,
@@ -101,4 +112,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
101112
* Get an unique client id.
102113
*/
103114
getClientId(): Promise<string>;
115+
116+
/**
117+
* Invokes the `powersync_control` function for the sync client.
118+
*/
119+
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
104120
}

packages/common/src/client/sync/bucket/OplogEntry.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export interface OplogEntryJSON {
88
object_type?: string;
99
op_id: string;
1010
op: OpTypeJSON;
11-
subkey?: string | object;
11+
subkey?: string;
1212
}
1313

1414
export class OplogEntry {
@@ -17,7 +17,7 @@ export class OplogEntry {
1717
row.op_id,
1818
OpType.fromJSON(row.op),
1919
row.checksum,
20-
typeof row.subkey == 'string' ? row.subkey : JSON.stringify(row.subkey),
20+
row.subkey,
2121
row.object_type,
2222
row.object_id,
2323
row.data
@@ -28,21 +28,23 @@ export class OplogEntry {
2828
public op_id: OpId,
2929
public op: OpType,
3030
public checksum: number,
31-
public subkey: string,
31+
public subkey?: string,
3232
public object_type?: string,
3333
public object_id?: string,
3434
public data?: string
3535
) {}
3636

37-
toJSON(): OplogEntryJSON {
37+
toJSON(fixedKeyEncoding = false): OplogEntryJSON {
3838
return {
3939
op_id: this.op_id,
4040
op: this.op.toJSON(),
4141
object_type: this.object_type,
4242
object_id: this.object_id,
4343
checksum: this.checksum,
4444
data: this.data,
45-
subkey: JSON.stringify(this.subkey)
45+
// Older versions of the JS SDK used to always JSON.stringify here. That has always been wrong,
46+
// but we need to migrate gradually to not break existing databases.
47+
subkey: fixedKeyEncoding ? this.subkey : JSON.stringify(this.subkey)
4648
};
4749
}
4850
}

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
BucketStorageAdapter,
1111
BucketStorageListener,
1212
Checkpoint,
13+
PowerSyncControlCommand,
1314
PSInternalTable,
1415
SyncLocalDatabaseResult
1516
} from './BucketStorageAdapter.js';
@@ -99,13 +100,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
99100
return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }]));
100101
}
101102

102-
async saveSyncData(batch: SyncDataBatch) {
103+
async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean = false) {
103104
await this.writeTransaction(async (tx) => {
104105
let count = 0;
105106
for (const b of batch.buckets) {
106107
const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
107108
'save',
108-
JSON.stringify({ buckets: [b.toJSON()] })
109+
JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] })
109110
]);
110111
this.logger.debug('saveSyncData', JSON.stringify(result));
111112
count += b.data.length;
@@ -413,6 +414,32 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
413414
async setTargetCheckpoint(checkpoint: Checkpoint) {
414415
// No-op for now
415416
}
417+
418+
async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
419+
return await this.writeTransaction(async (tx) => {
420+
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
421+
return raw;
422+
});
423+
}
424+
425+
async hasMigratedSubkeys(): Promise<boolean> {
426+
const { r } = await this.db.get<{ r: number }>('SELECT EXISTS(SELECT * FROM ps_kv WHERE key = ?) as r', [
427+
SqliteBucketStorage._subkeyMigrationKey
428+
]);
429+
return r != 0;
430+
}
431+
432+
async migrateToFixedSubkeys(): Promise<void> {
433+
await this.writeTransaction(async (tx) => {
434+
await tx.execute('UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);');
435+
await tx.execute('INSERT OR REPLACE INTO ps_kv (key, value) VALUES (?, ?);', [
436+
SqliteBucketStorage._subkeyMigrationKey,
437+
'1'
438+
]);
439+
});
440+
}
441+
442+
static _subkeyMigrationKey = 'powersync_js_migrated_subkeys';
416443
}
417444

418445
function hasMatchingPriority(priority: number, bucket: BucketChecksum) {

packages/common/src/client/sync/bucket/SyncDataBucket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ export class SyncDataBucket {
3737
public next_after?: OpId
3838
) {}
3939

40-
toJSON(): SyncDataBucketJSON {
40+
toJSON(fixedKeyEncoding = false): SyncDataBucketJSON {
4141
return {
4242
bucket: this.bucket,
4343
has_more: this.has_more,
4444
after: this.after,
4545
next_after: this.next_after,
46-
data: this.data.map((entry) => entry.toJSON())
46+
data: this.data.map((entry) => entry.toJSON(fixedKeyEncoding))
4747
};
4848
}
4949
}

0 commit comments

Comments
 (0)