@@ -235,8 +235,10 @@ export class Api {
235
235
const awareness = new awarenessProtocol . Awareness ( ydoc )
236
236
awareness . setLocalState ( null ) // we don't want to propagate awareness state
237
237
const now = performance . now ( )
238
+ if ( docstate ) { Y . applyUpdateV2 ( ydoc , docstate . doc ) }
239
+ let changed = false
240
+ ydoc . once ( 'afterTransaction' , ( tr ) => { changed = tr . changed . size > 0 } )
238
241
ydoc . transact ( ( ) => {
239
- if ( docstate ) { Y . applyUpdateV2 ( ydoc , docstate . doc ) }
240
242
docMessages ?. messages . forEach ( m => {
241
243
const decoder = decoding . createDecoder ( m )
242
244
switch ( decoding . readVarUint ( decoder ) ) {
@@ -254,7 +256,13 @@ export class Api {
254
256
} )
255
257
} )
256
258
logApi ( `took ${ performance . now ( ) - now } ms to process messages for room: ${ room } ` )
257
- return { ydoc, awareness, redisLastId : docMessages ?. lastId . toString ( ) || '0' , storeReferences : docstate ?. references || null }
259
+ return {
260
+ ydoc,
261
+ awareness,
262
+ redisLastId : docMessages ?. lastId . toString ( ) || '0' ,
263
+ storeReferences : docstate ?. references || null ,
264
+ changed
265
+ }
258
266
}
259
267
260
268
/**
@@ -299,11 +307,14 @@ export class Api {
299
307
} else {
300
308
reclaimCounts ++
301
309
const { room, docid } = decodeRedisRoomStreamName ( task . stream , this . prefix )
302
- const { ydoc, storeReferences, redisLastId } = await this . getDoc ( room , docid )
310
+ const { ydoc, storeReferences, redisLastId, changed } = await this . getDoc ( room , docid )
303
311
const lastId = math . max ( number . parseInt ( redisLastId . split ( '-' ) [ 0 ] ) , number . parseInt ( task . id . split ( '-' ) [ 0 ] ) )
304
- await this . store . persistDoc ( room , docid , ydoc )
312
+ if ( changed ) {
313
+ logWorker ( `persisting changes in room: ${ room } ` )
314
+ await this . store . persistDoc ( room , docid , ydoc )
315
+ } else logWorker ( `skip persisting room: ${ room } due to no changes` )
305
316
await promise . all ( [
306
- storeReferences ? this . store . deleteReferences ( room , docid , storeReferences ) : promise . resolve ( ) ,
317
+ storeReferences && changed ? this . store . deleteReferences ( room , docid , storeReferences ) : promise . resolve ( ) ,
307
318
this . redis . multi ( )
308
319
. xTrim ( task . stream , 'MINID' , lastId - this . redisMinMessageLifetime )
309
320
. xAdd ( this . redisWorkerStreamName , '*' , { compact : task . stream } )
0 commit comments