Skip to content

Commit 10fccbc

Browse files
committed
add a streaming archiver
1 parent c6bb589 commit 10fccbc

File tree

1 file changed

+49
-0
lines changed

1 file changed

+49
-0
lines changed

dev/streamArchive.ts

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import cassandra from '../store/cassandra';
2+
import db from '../store/db';
3+
import { doArchiveFromLegacy } from '../store/queries';
4+
5+
let i = 0;
6+
cassandra
7+
.stream('select match_id, version from matches', [], {
8+
prepare: true,
9+
autoPage: true,
10+
})
11+
.on('readable', async function () {
12+
// readable is emitted as soon a row is received and parsed
13+
let row;
14+
//@ts-ignore
15+
while ((row = this.read())) {
16+
// process row
17+
i += 1;
18+
console.log(i, row.match_id);
19+
if (row.version) {
20+
await db.raw(
21+
'INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING',
22+
[row.match_id]
23+
);
24+
await doArchiveFromLegacy(row.match_id.toString());
25+
} else {
26+
await deleteFromStore(row.match_id.toString());
27+
}
28+
}
29+
})
30+
.on('end', function () {
31+
// emitted when all rows have been retrieved and read
32+
console.log('finished');
33+
process.exit(0);
34+
})
35+
.on('error', function (e) {
36+
console.error(e);
37+
process.exit(1);
38+
});
39+
40+
async function deleteFromStore(id: string) {
41+
await Promise.all([
42+
cassandra.execute('DELETE from player_matches where match_id = ?', [id], {
43+
prepare: true,
44+
}),
45+
cassandra.execute('DELETE from matches where match_id = ?', [id], {
46+
prepare: true,
47+
}),
48+
]);
49+
}

0 commit comments

Comments
 (0)