-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsparqlGraphConnection.js
84 lines (73 loc) · 2.53 KB
/
sparqlGraphConnection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import httpCall from './httpCall.js';
import { Buffer } from 'node:buffer';
const DEFAULT_BUFFER_SIZE = 5000000;
const DEFAULT_MAX_PARALLEL_CALLS = 10;
export default class SparqlGraphConnection {
constructor(url, options = {}) {
this.options = options;
this.url = url;
this.buffer = Buffer.alloc(options.bufferSize || DEFAULT_BUFFER_SIZE);
this.bufferPosition = 0;
this.maxCalls = options.maxCalls || DEFAULT_MAX_PARALLEL_CALLS;
this.promises = [];
this.error = null;
}
_checkError() {
if (this.error) {
throw new Error(this.error);
}
}
async _waitTillWaitingCallsAre(numMaxWaitingCalls) {
this._checkError();
while(this.promises.length > numMaxWaitingCalls) {
await new Promise(resolve => setTimeout(resolve));
this._checkError();
}
}
async delete() {
try {
await httpCall(this.url, {
method: 'DELETE'
});
} catch(e) {
// do nothing if error is given because the graph does not exist
}
}
async post(turtleStr) {
this._checkError();
const newDataBuffer = Buffer.from(turtleStr);
const writtenBytes = newDataBuffer.copy(this.buffer, this.bufferPosition);
this.bufferPosition += writtenBytes;
if (writtenBytes < newDataBuffer.length) {
await this._flush(newDataBuffer.toString('utf-8', writtenBytes));
}
}
async _flush(lastTurtleStr) {
const payload = (this.options.preamble || '') +
this.buffer.toString('utf-8', 0, this.bufferPosition) +
(lastTurtleStr || '');
this.buffer.fill();
this.bufferPosition = 0;
await this._waitTillWaitingCallsAre(this.maxCalls);
// console.log('Calling POST to ' + this.url + ' with content: ' + payload);
const callPromise = httpCall(this.url, {
method: 'POST',
headers: {'Content-Type': 'text/turtle'},
body: payload
});
this.promises.push(callPromise);
callPromise.then(() => {
this.promises.splice(this.promises.indexOf(callPromise), 1);
}, (error) => {
this.error = error;
})
}
async sync() {
// console.log('Flushing connection... ');
await this._flush();
// console.log('Done!');
// console.log('Waiting for responses...');
await this._waitTillWaitingCallsAre(0);
// console.log('Done!');
}
}