-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSyncConnector.js
116 lines (98 loc) · 4.77 KB
/
SyncConnector.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const SyncManager = require('./SyncManager');
const EventEmitter = require('events');
const { ObjectID } = require('mongodb');
module.exports = (options)=> {
return new Promise( async (resolve, reject)=> {
// Multithread mode is UNSTABLE yet. Currently always turn it off.
options.multiThreaded = false;
// Caller will interact with SyncManager via SyncWorker
if (options.multiThreaded) {
const { Worker } = require('worker_threads');
const SyncWorker = new Worker(__dirname + '/SyncWorker.js', { workerData: options });
SyncWorker.on("message", async message=> {
if (message == "init-ready") {
class SyncerClass extends EventEmitter {
getCollection(collectionName) {
return new Promise(async resolve=> {
this.collection = await SyncManager.collection(options, collectionName);
resolve();
});
}
constructor(collection, indexes, uniqueIndexes, syncInterval) {
super();
this.collectionName = collection;
this.worker = SyncWorker;
this.worker.postMessage(
JSON.stringify({
action: "new",
collection, indexes, uniqueIndexes, syncInterval
})
);
this.worker.on("message", message=> {
switch (message.action) {
/*
case "on-inserted":
this.emit("inserted", message.id, message.fullDocument);
break;
case "on-updated":
this.emit("updated", message.id, message.updatedFields, message.fullDocument);
break;
*/
case "on-error":
console.log ("SyncWorker error: " + message.message);
break;
}
});
}
create(obj) {
// We need to create _id here -- otherwise, an update maybe called before _id was assigned
obj._id = new ObjectID();
this.worker.postMessage(
JSON.stringify({
action: "create",
obj
})
);
}
update(obj, index, property, value, oldValue) {
this.worker.postMessage(
JSON.stringify({
action: "update",
obj: JSON.stringify(obj),
index: JSON.stringify(index),
property,
value: JSON.stringify(value),
oldValue: JSON.stringify(oldValue)
})
);
}
unset(obj, index, property) {
this.worker.postMessage(
JSON.stringify({
action: "unset",
obj, index, property
})
);
}
ensureIndexes(indexes, uniqueIndexes, sparse) {
this.worker.postMessage(
JSON.stringify({
action: "ensureIndexes",
indexes, uniqueIndexes, sparse
})
);
}
}
// Resolves from getCollection method, since it can be called later
resolve (SyncerClass);
}
});
}
else {
// Caller interacts directly with SyncManager class.
// A single-threaded Syncer class
const Syncer = await SyncManager.init(options);
resolve(Syncer);
}
});
}