forked from share/sharedb-postgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
277 lines (263 loc) · 7.41 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
var DB = require("sharedb").DB;
var pg = require("pg");
// Postgres-backed ShareDB database
function PostgresDB(options) {
if (!(this instanceof PostgresDB)) return new PostgresDB(options);
DB.call(this, options);
this.closed = false;
this.pool = new pg.Pool(options);
}
module.exports = PostgresDB;
PostgresDB.prototype = Object.create(DB.prototype);
PostgresDB.prototype.close = function (callback) {
this.closed = true;
this.pool.end();
if (callback) callback();
};
function rollback(client, done) {
client.query("ROLLBACK", function (err) {
return done(err);
});
}
// Persists an op and snapshot if it is for the next version. Calls back with
// callback(err, succeeded)
PostgresDB.prototype.commit = function (
collection,
id,
op,
snapshot,
options,
callback,
) {
/*
* op: CreateOp {
* src: '24545654654646',
* seq: 1,
* v: 0,
* create: { type: 'http://sharejs.org/types/JSONv0', data: { ... } },
* m: { ts: 12333456456 } }
* }
* snapshot: PostgresSnapshot
*/
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
function commit() {
client.query("COMMIT", function (err) {
done(err);
if (err) {
callback(err);
} else {
callback(null, true);
}
});
}
client.query(
"SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2",
[collection, id],
function (err, res) {
var max_version = res.rows[0].max_version;
if (max_version == null) max_version = 0;
if (snapshot.v !== max_version + 1) {
return callback(null, false);
}
client.query("BEGIN", function (err) {
client.query(
"INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)",
[collection, id, snapshot.v, op],
function (err, res) {
if (err) {
// TODO: if err is "constraint violation", callback(null, false) instead
rollback(client, done);
callback(err);
return;
}
if (snapshot.v === 1) {
client.query(
"INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)",
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function (err, res) {
// TODO:
// if the insert was successful and did insert, callback(null, true)
// if the insert was successful and did not insert, callback(null, false)
// if there was an error, rollback and callback(error)
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
},
);
} else {
client.query(
"UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)",
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function (err, res) {
// TODO:
// if any rows were updated, success
// if 0 rows were updated, rollback and not success
// if error, rollback and not success
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
},
);
}
},
);
});
},
);
});
};
// Get the named document from the database. The callback is called with (err,
// snapshot). A snapshot with a version of zero is returned if the docuemnt
// has never been created in the database.
PostgresDB.prototype.getSnapshot = function (
collection,
id,
fields,
options,
callback,
) {
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
client.query(
"SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1",
[collection, id],
function (err, res) {
done();
if (err) {
callback(err);
return;
}
if (res.rows.length) {
var row = res.rows[0];
var snapshot = new PostgresSnapshot(
id,
row.version,
row.doc_type,
row.data,
undefined, // TODO: metadata
);
callback(null, snapshot);
} else {
var snapshot = new PostgresSnapshot(
id,
0,
null,
undefined,
undefined,
);
callback(null, snapshot);
}
},
);
});
};
// Get operations between [from, to) noninclusively. (Ie, the range should
// contain start but not end).
//
// If end is null, this function should return all operations from start onwards.
//
// The operations that getOps returns don't need to have a version: field.
// The version will be inferred from the parameters if it is missing.
//
// Callback should be called as callback(error, [list of ops]);
PostgresDB.prototype.getOps = function (
collection,
id,
from,
to,
options,
callback,
) {
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
client.query(
"SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4",
[collection, id, from, to],
function (err, res) {
done();
if (err) {
callback(err);
return;
}
callback(
null,
res.rows.map(function (row) {
return row.operation;
}),
);
},
);
});
};
PostgresDB.prototype.query = function (
collectionName,
inputQuery,
fields,
options,
callback,
) {
this.pool.connect(function (err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
// TODO: more consistent parse
const limit = inputQuery["$limit"];
const sort = inputQuery["$sort"];
const sortKey = Object.keys(sort)[0];
const sortDirection = sort[sortKey] === -1 ? "DESC" : "ASC";
client.query(
`SELECT version, data, doc_type FROM snapshots WHERE collection = $1 ORDER BY data->>$3 ${sortDirection} LIMIT $2`,
[collectionName, limit, sortKey],
function (err, res) {
done();
if (err) {
callback(err);
return;
}
if (res.rows.length) {
var snapshots = [];
for (var i = 0; i < res.rows.length; i++) {
const row = res.rows[i];
var snapshot = new PostgresSnapshot(
row.data.documentId,
row.version,
row.doc_type,
row.data,
undefined, // TODO: metadata
);
snapshots.push(snapshot);
}
callback(null, snapshots);
}
},
);
});
};
function PostgresSnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}