Skip to content

WIP: Add sync implementation from core extension #599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { once } from 'node:events';
import repl_factory from 'node:repl';

import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import {
createBaseLogger,
createLogger,
PowerSyncDatabase,
SyncClientImplementation,
SyncStreamConnectionMethod
} from '@powersync/node';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';

Expand All @@ -26,7 +32,16 @@ const main = async () => {
});
console.log(await db.get('SELECT powersync_rs_version();'));

await db.connect(new DemoConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
db.registerListener({
statusChanged(status) {
console.log('status changed', status);
}
});

await db.connect(new DemoConnector(), {
connectionMethod: SyncStreamConnectionMethod.HTTP,
clientImplementation: SyncClientImplementation.RUST
});
await db.waitForFirstSync();
console.log('First sync complete!');

Expand Down
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
"async-mutex": "^0.4.0",
"bson": "^6.6.0",
"buffer": "^6.0.3",
"can-ndjson-stream": "^1.0.2",
"cross-fetch": "^4.0.0",
"event-iterator": "^2.0.0",
"rollup": "4.14.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ export interface BucketStorageListener extends BaseListener {

export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
init(): Promise<void>;
saveSyncData(batch: SyncDataBatch): Promise<void>;
saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean): Promise<void>;
removeBuckets(buckets: string[]): Promise<void>;
setTargetCheckpoint(checkpoint: Checkpoint): Promise<void>;

startSession(): void;

getBucketStates(): Promise<BucketState[]>;
getBucketOperationProgress(): Promise<BucketOperationProgress>;
hasMigratedSubkeys(): Promise<boolean>;
migrateToFixedSubkeys(): Promise<void>;

syncLocalDatabase(
checkpoint: Checkpoint,
Expand Down Expand Up @@ -101,4 +103,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
* Get an unique client id.
*/
getClientId(): Promise<string>;

/**
* Invokes the `powersync_control` function for the sync client.
*/
control(op: string, payload: string | ArrayBuffer | null): Promise<string>;
}
12 changes: 7 additions & 5 deletions packages/common/src/client/sync/bucket/OplogEntry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface OplogEntryJSON {
object_type?: string;
op_id: string;
op: OpTypeJSON;
subkey?: string | object;
subkey?: string;
}

export class OplogEntry {
Expand All @@ -17,7 +17,7 @@ export class OplogEntry {
row.op_id,
OpType.fromJSON(row.op),
row.checksum,
typeof row.subkey == 'string' ? row.subkey : JSON.stringify(row.subkey),
row.subkey,
row.object_type,
row.object_id,
row.data
Expand All @@ -28,21 +28,23 @@ export class OplogEntry {
public op_id: OpId,
public op: OpType,
public checksum: number,
public subkey: string,
public subkey?: string,
public object_type?: string,
public object_id?: string,
public data?: string
) {}

toJSON(): OplogEntryJSON {
toJSON(fixedKeyEncoding = false): OplogEntryJSON {
return {
op_id: this.op_id,
op: this.op.toJSON(),
object_type: this.object_type,
object_id: this.object_id,
checksum: this.checksum,
data: this.data,
subkey: JSON.stringify(this.subkey)
// Older versions of the JS SDK used to always JSON.stringify here. That has always been wrong,
// but we need to migrate gradually to not break existing databases.
subkey: fixedKeyEncoding ? this.subkey : JSON.stringify(this.subkey)
};
}
}
30 changes: 28 additions & 2 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }]));
}

async saveSyncData(batch: SyncDataBatch) {
async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean) {
await this.writeTransaction(async (tx) => {
let count = 0;
for (const b of batch.buckets) {
const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
'save',
JSON.stringify({ buckets: [b.toJSON()] })
JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] })
]);
this.logger.debug('saveSyncData', JSON.stringify(result));
count += b.data.length;
Expand Down Expand Up @@ -413,6 +413,32 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
async setTargetCheckpoint(checkpoint: Checkpoint) {
// No-op for now
}

async control(op: string, payload: string | ArrayBuffer | null): Promise<string> {
return await this.writeTransaction(async (tx) => {
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
return raw;
});
}

async hasMigratedSubkeys(): Promise<boolean> {
const { r } = await this.db.get<{ r: number }>('SELECT EXISTS(SELECT * FROM ps_kv WHERE key = ?) as r', [
SqliteBucketStorage._subkeyMigrationKey
]);
return r != 0;
}

async migrateToFixedSubkeys(): Promise<void> {
await this.writeTransaction(async (tx) => {
await tx.execute('UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);');
await tx.execute('INSERT OR REPLACE INTO ps_kv (key, value) VALUES (?, ?);', [
SqliteBucketStorage._subkeyMigrationKey,
'1'
]);
});
}

static _subkeyMigrationKey = 'powersync_js_migrated_subkeys';
}

function hasMatchingPriority(priority: number, bucket: BucketChecksum) {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/src/client/sync/bucket/SyncDataBucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ export class SyncDataBucket {
public next_after?: OpId
) {}

toJSON(): SyncDataBucketJSON {
toJSON(fixedKeyEncoding = false): SyncDataBucketJSON {
return {
bucket: this.bucket,
has_more: this.has_more,
after: this.after,
next_after: this.next_after,
data: this.data.map((entry) => entry.toJSON())
data: this.data.map((entry) => entry.toJSON(fixedKeyEncoding))
};
}
}
Loading
Loading