Skip to content

Commit ec34bc9

Browse files
committed
feat: allow worker thread persist
1 parent 254f4e9 commit ec34bc9

File tree

5 files changed

+86
-5
lines changed

5 files changed

+86
-5
lines changed

src/api.js

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ export class Api {
118118
this.redisWorkerGroupName = this.prefix + ':worker'
119119
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
120120
this._destroyed = false
121+
/** @type {import('worker_threads').Worker | null} */
122+
this.persistWorker = null
121123

122124
const addScript = WORKER_DISABLED
123125
? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])'

src/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export * from './server.js'
33
export * from './storage.js'
44
export * from './api.js'
55
export * from './subscriber.js'
6+
export * from './persist-worker-thread.js'
67
export * from './y-socket-io/index.js'

src/persist-worker-thread.js

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import * as Y from 'yjs'
2+
import * as logging from 'lib0/logging'
3+
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'
4+
import path from 'path'
5+
6+
export class PersistWorkerThread {
7+
/**
8+
* @private
9+
* @readonly
10+
*/
11+
log = logging.createModuleLogger('@y/persist-worker-thread')
12+
13+
/**
14+
* @param {import('./storage.js').AbstractStorage} store
15+
*/
16+
constructor(store) {
17+
if (isMainThread) {
18+
this.log('persist worker cannot run on main thread')
19+
return
20+
}
21+
this.store = store
22+
parentPort?.on('message', this.persist)
23+
}
24+
25+
/**
26+
* @param {{ room: string, docstate: SharedArrayBuffer }} props
27+
*/
28+
persist = async ({ room, docstate }) => {
29+
const state = new Uint8Array(docstate)
30+
const doc = new Y.Doc()
31+
Y.applyUpdateV2(doc, state)
32+
await this.store?.persistDoc(room, 'index', doc)
33+
doc.destroy()
34+
}
35+
}
36+
37+
/**
38+
* @param {import('./storage.js').AbstractStorage} store
39+
*/
40+
export function createPersistWorkerThread(store) {
41+
if (isMainThread) throw new Error('cannot create persist worker in main thread')
42+
return new PersistWorkerThread(store)
43+
}

src/socketio.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ class YSocketIOServer {
3737
* @param {string} [conf.redisPrefix]
3838
* @param {string} [conf.redisUrl]
3939
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
40+
* @param {import('worker_threads').Worker=} [conf.persistWorker]
4041
*/
41-
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix }) => {
42+
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
4243
const app = new YSocketIO(io, { authenticate })
43-
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix })
44+
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
4445
return new YSocketIOServer(app, client, subscriber)
4546
}

src/y-socket-io/y-socket-io.js

+37-3
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ export class YSocketIO {
136136
* @readonly
137137
*/
138138
namespacePersistentMap = new Map()
139+
/**
140+
* @type {Map<string, () => void>}
141+
* @private
142+
* @readonly
143+
*/
144+
awaitingPersistMap = new Map()
139145

140146
/**
141147
* YSocketIO constructor.
@@ -156,16 +162,20 @@ export class YSocketIO {
156162
*
157163
* It also starts socket connection listeners.
158164
* @param {import('../storage.js').AbstractStorage} store
159-
* @param {{ redisPrefix?: string, redisUrl?: string }=} opts
165+
* @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts
160166
* @public
161167
*/
162-
async initialize (store, { redisUrl, redisPrefix = 'y' } = {}) {
168+
async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) {
163169
const [client, subscriber] = await promise.all([
164170
api.createApiClient(store, { redisUrl, redisPrefix }),
165171
createSubscriber(store, { redisUrl, redisPrefix })
166172
])
167173
this.client = client
168174
this.subscriber = subscriber
175+
if (persistWorker) {
176+
this.client.persistWorker = persistWorker
177+
this.registerPersistWorkerResolve()
178+
}
169179

170180
this.nsp = this.io.of(/^\/yjs\|.*$/)
171181

@@ -475,7 +485,24 @@ export class YSocketIO {
475485
assert(this.client)
476486
const doc = this.debouncedPersistDocMap.get(namespace)
477487
if (!doc) return
478-
await this.client.store.persistDoc(namespace, 'index', doc)
488+
if (this.client.persistWorker) {
489+
/** @type {Promise<void>} */
490+
const promise = new Promise((res) => {
491+
assert(this.client?.persistWorker)
492+
this.awaitingPersistMap.set(namespace, res)
493+
494+
const docState = Y.encodeStateAsUpdateV2(doc)
495+
const buf = new Uint8Array(new SharedArrayBuffer(docState.length))
496+
buf.set(docState)
497+
this.client.persistWorker.postMessage({
498+
room: namespace,
499+
docstate: buf
500+
})
501+
})
502+
await promise
503+
} else {
504+
await this.client.store.persistDoc(namespace, 'index', doc)
505+
}
479506
await this.client.trimRoomStream(namespace, 'index', true)
480507
this.debouncedPersistDocMap.delete(namespace)
481508
this.debouncedPersistMap.delete(namespace)
@@ -569,4 +596,11 @@ export class YSocketIO {
569596
console.error(e)
570597
}
571598
}
599+
600+
registerPersistWorkerResolve () {
601+
if (!this.client?.persistWorker) return
602+
this.client.persistWorker.on('message', ({ event, room }) => {
603+
if (event === 'persisted') this.awaitingPersistMap.get(room)?.()
604+
})
605+
}
572606
}

0 commit comments

Comments
 (0)