Skip to content

Commit a479799

Browse files
authored
Merge pull request #681 from streamich/json-crdt-improvements
JSON CRDT improvements
2 parents 5ab4105 + 808a99c commit a479799

File tree

4 files changed

+125
-40
lines changed

4 files changed

+125
-40
lines changed

src/json-crdt-patch/Patch.ts

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as operations from './operations';
2-
import {ITimestampStruct, ts, printTs} from './clock';
3-
import {SESSION} from './constants';
2+
import {ITimestampStruct, ts, printTs, Timestamp} from './clock';
43
import {printTree} from 'tree-dump/lib/printTree';
54
import {encode, decode} from './codec/binary';
65
import type {Printable} from 'tree-dump/lib/types';
@@ -121,7 +120,8 @@ export class Patch implements Printable {
121120
for (let i = 0; i < length; i++) {
122121
const op = ops[i];
123122
if (op instanceof operations.DelOp) patchOps.push(new operations.DelOp(ts(op.id), ts(op.obj), op.what));
124-
else if (op instanceof operations.NewConOp) patchOps.push(new operations.NewConOp(ts(op.id), op.val));
123+
else if (op instanceof operations.NewConOp)
124+
patchOps.push(new operations.NewConOp(ts(op.id), op.val instanceof Timestamp ? ts(op.val) : op.val));
125125
else if (op instanceof operations.NewVecOp) patchOps.push(new operations.NewVecOp(ts(op.id)));
126126
else if (op instanceof operations.NewValOp) patchOps.push(new operations.NewValOp(ts(op.id)));
127127
else if (op instanceof operations.NewObjOp) patchOps.push(new operations.NewObjOp(ts(op.id)));
@@ -144,36 +144,44 @@ export class Patch implements Printable {
144144
op.data.map(([key, value]) => [key, ts(value)]),
145145
),
146146
);
147+
else if (op instanceof operations.InsVecOp)
148+
patchOps.push(
149+
new operations.InsVecOp(
150+
ts(op.id),
151+
ts(op.obj),
152+
op.data.map(([key, value]) => [key, ts(value)]),
153+
),
154+
);
147155
else if (op instanceof operations.NopOp) patchOps.push(new operations.NopOp(ts(op.id), op.len));
148156
}
149157
return patch;
150158
}
151159

152160
/**
153-
* The .rebase() operation is meant to work only with patch that use
154-
* the server clock. When receiving a patch from a client, the starting
155-
* ID of the patch can be out of sync with the server clock. For example,
156-
* if some other user has in the meantime pushed operations to the server.
161+
* The `.rebase()` operation is meant to be applied to patches which have not
162+
* yet been advertised to the server (other peers), or when
163+
* the server clock is used and concurrent change on the server happened.
157164
*
158165
* The .rebase() operation returns a new `Patch` with the IDs recalculated
159-
* such that the first operation has ID of the patch is equal to the
160-
* actual server time tip.
166+
* such that the first operation has the `time` equal to `newTime`.
161167
*
162-
* @param serverTime Real server time tip (ID of the next expected operation).
168+
* @param newTime Time where the patch ID should begin (ID of the first operation).
169+
* @param transformAfter Time after (and including) which the IDs should be
170+
* transformed. If not specified, equals to the time of the first operation.
163171
*/
164-
public rebase(serverTime: number, transformHorizon: number): Patch {
172+
public rebase(newTime: number, transformAfter?: number): Patch {
165173
const id = this.getId();
166174
if (!id) throw new Error('EMPTY_PATCH');
175+
const sid = id.sid;
167176
const patchStartTime = id.time;
168-
if (patchStartTime === serverTime) return this;
169-
const delta = serverTime - patchStartTime;
177+
transformAfter ??= patchStartTime;
178+
if (patchStartTime === newTime) return this;
179+
const delta = newTime - patchStartTime;
170180
return this.rewriteTime((id: ITimestampStruct): ITimestampStruct => {
171-
const sessionId = id.sid;
172-
const isServerTimestamp = sessionId === SESSION.SERVER;
173-
if (!isServerTimestamp) return id;
181+
if (id.sid !== sid) return id;
174182
const time = id.time;
175-
if (time < transformHorizon) return id;
176-
return ts(SESSION.SERVER, time + delta);
183+
if (time < transformAfter) return id;
184+
return ts(sid, time + delta);
177185
});
178186
}
179187

src/json-crdt-patch/__tests__/Patch.spec.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import {Model} from '../../json-crdt/model';
2+
import {s} from '../builder/schema';
13
import {LogicalClock, ts} from '../clock';
24
import {SESSION} from '../constants';
35
import {InsArrOp} from '../operations';
@@ -13,7 +15,7 @@ describe('.rebase()', () => {
1315
expect(patch.ops[0].id.time).toBe(10);
1416
});
1517

16-
test('does not rewrite references, which are commited on the server', () => {
18+
test('does not rewrite references, which are committed on the server', () => {
1719
const builder = new PatchBuilder(new LogicalClock(SESSION.SERVER, 5));
1820
builder.insArr(ts(SESSION.SERVER, 3), ts(SESSION.SERVER, 3), [ts(0, 10)]);
1921
expect((builder.patch.ops[0] as InsArrOp).ref.time).toBe(3);
@@ -28,5 +30,45 @@ describe('.rebase()', () => {
2830
const patch = builder.patch.rebase(10, 5);
2931
expect((patch.ops[0] as InsArrOp).ref.time).toBe(12);
3032
});
33+
34+
test('can advance patch ID', () => {
35+
const model = Model.create();
36+
model.api.root({
37+
foo: 'bar',
38+
num: s.con(123),
39+
arr: [null],
40+
vec: s.vec(s.con(1), s.con(2)),
41+
id: s.con(ts(4, 5)),
42+
val: s.val(s.con('asdf')),
43+
bin: s.bin(new Uint8Array([1, 2, 3])),
44+
});
45+
const patch1 = model.api.flush();
46+
const patch2 = patch1.rebase(1000);
47+
expect(patch1.getId()!.time).not.toBe(1000);
48+
expect(patch2.getId()!.time).toBe(1000);
49+
expect(patch2.getId()!.sid).toBe(patch1.getId()!.sid);
50+
const model2 = Model.create();
51+
model2.applyPatch(patch2);
52+
expect(model2.view()).toEqual(model.view());
53+
});
54+
55+
test('transforms "con" ID values, if they share the patch SID', () => {
56+
const model = Model.create();
57+
const sid = model.clock.sid;
58+
model.api.root({
59+
id1: s.con(ts(4, 5)),
60+
id2: s.con(ts(model.clock.sid, 5)),
61+
});
62+
const patch1 = model.api.flush();
63+
const base = patch1.getId()!.time;
64+
const patch2 = patch1.rebase(1000);
65+
expect(patch1.getId()!.time).not.toBe(1000);
66+
expect(patch2.getId()!.time).toBe(1000);
67+
expect(patch2.getId()!.sid).toBe(patch1.getId()!.sid);
68+
const model2 = Model.create();
69+
model2.applyPatch(patch2);
70+
expect((model2.view() as any).id1).toEqual(ts(4, 5));
71+
expect((model2.view() as any).id2).toEqual(ts(sid, 1000 - base + 5));
72+
});
3173
});
3274
});

src/json-crdt/model/Model.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
3434
*/
3535
public static readonly sid = randomSessionId;
3636

37+
/**
38+
* Use this method to generate a random session ID for an existing document.
39+
* It checks for the uniqueness of the session ID given the current peers in
40+
* the document. This reduces the chance of collision substantially.
41+
*
42+
* @returns A random session ID that is not used by any peer in the current
43+
* document.
44+
*/
45+
public rndSid(): number {
46+
const clock = this.clock;
47+
const sid = clock.sid;
48+
const peers = clock.peers;
49+
while (true) {
50+
const candidate = randomSessionId();
51+
if (sid !== candidate && !peers.has(candidate)) return candidate;
52+
}
53+
}
54+
3755
/**
3856
* Create a CRDT model which uses logical clock. Logical clock assigns a
3957
* logical timestamp to every node and operation. Logical timestamp consists
@@ -46,7 +64,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
4664
* @deprecated Use `Model.create()` instead.
4765
*/
4866
public static readonly withLogicalClock = (clockOrSessionId?: clock.ClockVector | number): Model => {
49-
return Model.create(undefined, clockOrSessionId);
67+
return Model.create(void 0, clockOrSessionId);
5068
};
5169

5270
/**
@@ -62,7 +80,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
6280
* @deprecated Use `Model.create()` instead: `Model.create(undefined, SESSION.SERVER)`.
6381
*/
6482
public static readonly withServerClock = (time: number = 1): Model => {
65-
return Model.create(undefined, new clock.ServerClockVector(SESSION.SERVER, time));
83+
return Model.create(void 0, new clock.ServerClockVector(SESSION.SERVER, time));
6684
};
6785

6886
/**
@@ -193,7 +211,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
193211
const first = patches[0];
194212
const sid = first.getId()!.sid;
195213
if (!sid) throw new Error('NO_SID');
196-
const model = Model.withLogicalClock(sid);
214+
const model = Model.create(void 0, sid);
197215
model.applyBatch(patches);
198216
return model;
199217
}
@@ -436,7 +454,7 @@ export class Model<N extends JsonNode = JsonNode<any>> implements Printable {
436454
* @param sessionId Session ID to use for the new model.
437455
* @returns A copy of this model with a new session ID.
438456
*/
439-
public fork(sessionId: number = Model.sid()): Model<N> {
457+
public fork(sessionId: number = this.rndSid()): Model<N> {
440458
const copy = Model.fromBinary(this.toBinary()) as unknown as Model<N>;
441459
if (copy.clock.sid !== sessionId && copy.clock instanceof clock.ClockVector)
442460
copy.clock = copy.clock.fork(sessionId);

src/json-crdt/model/__tests__/Model.cloning.spec.ts

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {Model} from '../Model';
55

66
describe('clone()', () => {
77
test('can clone a simple document', () => {
8-
const doc1 = Model.withLogicalClock();
8+
const doc1 = Model.create();
99
const builder1 = new PatchBuilder(doc1.clock);
1010
const obj = builder1.json({foo: 'bar', gg: [123]});
1111
builder1.root(obj);
@@ -17,7 +17,7 @@ describe('clone()', () => {
1717
});
1818

1919
test('can modify the cloned copy independently', () => {
20-
const doc1 = Model.withLogicalClock();
20+
const doc1 = Model.create();
2121
const builder1 = new PatchBuilder(doc1.clock);
2222
const obj = builder1.json({foo: 'bar', hh: true});
2323
builder1.root(obj);
@@ -34,7 +34,7 @@ describe('clone()', () => {
3434
});
3535

3636
test('can clone a document with string edits', () => {
37-
const doc1 = Model.withLogicalClock();
37+
const doc1 = Model.create();
3838
doc1.api.root({
3939
foo: 'abc',
4040
});
@@ -48,7 +48,7 @@ describe('clone()', () => {
4848
});
4949

5050
test('can clone a document with string deletes', () => {
51-
const doc1 = Model.withLogicalClock();
51+
const doc1 = Model.create();
5252
doc1.api.root({
5353
foo: 'abc',
5454
});
@@ -63,7 +63,7 @@ describe('clone()', () => {
6363
});
6464

6565
test('can clone a document with object edits', () => {
66-
const doc1 = Model.withLogicalClock();
66+
const doc1 = Model.create();
6767
doc1.api.root({
6868
foo: {
6969
a: 1,
@@ -90,7 +90,7 @@ describe('clone()', () => {
9090
});
9191

9292
test('can clone array with edits', () => {
93-
const doc1 = Model.withLogicalClock();
93+
const doc1 = Model.create();
9494
doc1.api.root({
9595
foo: {
9696
a: [1],
@@ -110,7 +110,7 @@ describe('clone()', () => {
110110
});
111111

112112
test('can clone an empty model', () => {
113-
const doc1 = Model.withLogicalClock();
113+
const doc1 = Model.create();
114114
const doc2 = doc1.clone();
115115
expect(doc1.clock.sid === doc2.clock.sid).toBe(true);
116116
expect(doc1.view()).toBe(undefined);
@@ -126,7 +126,7 @@ describe('clone()', () => {
126126

127127
describe('fork()', () => {
128128
test('can fork a simple document', () => {
129-
const doc1 = Model.withLogicalClock();
129+
const doc1 = Model.create();
130130
const builder1 = new PatchBuilder(doc1.clock);
131131
const obj = builder1.json([1, 2, 'lol']);
132132
builder1.root(obj);
@@ -138,7 +138,7 @@ describe('fork()', () => {
138138
});
139139

140140
test('forked document has a different session ID', () => {
141-
const doc1 = Model.withLogicalClock();
141+
const doc1 = Model.create();
142142
const builder1 = new PatchBuilder(doc1.clock);
143143
const obj = builder1.json([1, 2, 'lol']);
144144
builder1.root(obj);
@@ -149,7 +149,7 @@ describe('fork()', () => {
149149
});
150150

151151
test('can modify the cloned copy independently', () => {
152-
const doc1 = Model.withLogicalClock();
152+
const doc1 = Model.create();
153153
const builder1 = new PatchBuilder(doc1.clock);
154154
const arr = builder1.json([1, 2, 'lol']);
155155
builder1.root(arr);
@@ -165,12 +165,29 @@ describe('fork()', () => {
165165
expect(doc2.view()).toEqual([true, 1, 2, 'lol']);
166166
expect(doc1.clock.sid !== doc2.clock.sid).toBe(true);
167167
});
168+
169+
test('does not reuse existing session IDs when forking', () => {
170+
const rnd = Math.random;
171+
let i = 0;
172+
Math.random = () => {
173+
i++;
174+
return i < 20 ? 0.5 : i < 24 ? 0.1 : i < 30 ? 0.5 : rnd();
175+
};
176+
const model = Model.create();
177+
model.api.root(123);
178+
const model2 = model.fork();
179+
const model3 = model2.fork();
180+
expect(model.clock.sid).not.toBe(model2.clock.sid);
181+
expect(model3.clock.sid).not.toBe(model2.clock.sid);
182+
expect(model3.clock.sid).not.toBe(model.clock.sid);
183+
Math.random = rnd;
184+
});
168185
});
169186

170187
describe('reset()', () => {
171188
test('resets model state', () => {
172-
const doc1 = Model.withLogicalClock();
173-
const doc2 = Model.withLogicalClock();
189+
const doc1 = Model.create();
190+
const doc2 = Model.create();
174191
doc1.api.root({foo: 123});
175192
doc2.api.root({
176193
text: 'hello',
@@ -190,8 +207,8 @@ describe('reset()', () => {
190207
});
191208

192209
test('models can be edited separately', () => {
193-
const doc1 = Model.withLogicalClock();
194-
const doc2 = Model.withLogicalClock();
210+
const doc1 = Model.create();
211+
const doc2 = Model.create();
195212
doc1.api.root({foo: 123});
196213
doc2.api.root({
197214
text: 'hello',
@@ -207,8 +224,8 @@ describe('reset()', () => {
207224
});
208225

209226
test('emits change event on reset', async () => {
210-
const doc1 = Model.withLogicalClock();
211-
const doc2 = Model.withLogicalClock();
227+
const doc1 = Model.create();
228+
const doc2 = Model.create();
212229
doc1.api.root({foo: 123});
213230
doc2.api.root({
214231
text: 'hello',
@@ -222,7 +239,7 @@ describe('reset()', () => {
222239
});
223240

224241
test('preserves API nodes when model is reset', async () => {
225-
const doc1 = Model.withLogicalClock().setSchema(
242+
const doc1 = Model.create().setSchema(
226243
schema.obj({
227244
text: schema.str('hell'),
228245
}),

0 commit comments

Comments
 (0)