1
1
import { getOwner , setOwner } from '@ember/owner' ;
2
+ import { debounce } from '@ember/runloop' ;
2
3
import Service , { service } from '@ember/service' ;
3
4
import { isTesting } from '@embroider/macros' ;
4
5
@@ -16,6 +17,7 @@ import {
16
17
CommandContextStamp ,
17
18
getClass ,
18
19
identifyCard ,
20
+ delay ,
19
21
} from '@cardstack/runtime-common' ;
20
22
21
23
import type MatrixService from '@cardstack/host/services/matrix-service' ;
@@ -48,6 +50,7 @@ export default class CommandService extends Service {
48
50
@service declare private realmServer : RealmServerService ;
49
51
currentlyExecutingCommandRequestIds = new TrackedSet < string > ( ) ;
50
52
private commandProcessingEventQueue : string [ ] = [ ] ;
53
+ private flushCommandProcessingQueue : Promise < void > | undefined ;
51
54
52
55
private commands : Map <
53
56
string ,
@@ -63,7 +66,7 @@ export default class CommandService extends Service {
63
66
return name ;
64
67
}
65
68
66
- public async queueEventForCommandProcessing ( event : Partial < IEvent > ) {
69
+ public queueEventForCommandProcessing ( event : Partial < IEvent > ) {
67
70
let eventId = event . event_id ;
68
71
if ( event . content ?. [ 'm.relates_to' ] ?. rel_type === 'm.replace' ) {
69
72
eventId = event . content ?. [ 'm.relates_to' ] ! . event_id ;
@@ -85,24 +88,52 @@ export default class CommandService extends Service {
85
88
}
86
89
87
90
this . commandProcessingEventQueue . push ( compoundKey ) ;
88
- let roomResource = this . matrixService . roomResources . get ( event . room_id ! ) ;
89
- await roomResource ?. loading ;
90
91
91
- this . drainCommandProcessingQueue ( ) ;
92
+ debounce ( this , this . drainCommandProcessingQueue , 100 ) ;
92
93
}
93
94
94
95
private async drainCommandProcessingQueue ( ) {
95
- while ( this . commandProcessingEventQueue . length > 0 ) {
96
- let [ roomId , eventId ] = this . commandProcessingEventQueue
97
- . shift ( ) !
98
- . split ( '|' ) ;
96
+ await this . flushCommandProcessingQueue ;
97
+
98
+ let finishedProcessingCommands : ( ) => void ;
99
+ this . flushCommandProcessingQueue = new Promise (
100
+ ( res ) => ( finishedProcessingCommands = res ) ,
101
+ ) ;
102
+
103
+ let commandSpecs = [ ...this . commandProcessingEventQueue ] ;
104
+ this . commandProcessingEventQueue = [ ] ;
105
+
106
+ while ( commandSpecs . length > 0 ) {
107
+ let [ roomId , eventId ] = commandSpecs . shift ( ) ! . split ( '|' ) ;
99
108
100
109
let roomResource = this . matrixService . roomResources . get ( roomId ! ) ;
101
110
if ( ! roomResource ) {
102
111
throw new Error (
103
112
`Room resource not found for room id ${ roomId } , this should not happen` ,
104
113
) ;
105
114
}
115
+ let timeout = Date . now ( ) + 60_000 ; // reset the timer to avoid a long wait if the room resource is processing
116
+ let currentRoomProcessingTimestamp = roomResource . processingLastStarteAt ;
117
+ while (
118
+ roomResource . isProcessing &&
119
+ currentRoomProcessingTimestamp ===
120
+ roomResource . processingLastStarteAt &&
121
+ Date . now ( ) < timeout
122
+ ) {
123
+ // wait for the room resource to finish processing
124
+ await delay ( 100 ) ;
125
+ }
126
+ if (
127
+ roomResource . isProcessing &&
128
+ currentRoomProcessingTimestamp === roomResource . processingLastStarteAt
129
+ ) {
130
+ // room seems to be stuck processing, so we will log and skip this event
131
+ console . error (
132
+ `Room resource for room ${ roomId } seems to be stuck processing, skipping event ${ eventId } ` ,
133
+ ) ;
134
+ continue ;
135
+ }
136
+
106
137
let message = roomResource . messages . find ( ( m ) => m . eventId === eventId ) ;
107
138
if ( ! message ) {
108
139
continue ;
@@ -127,6 +158,7 @@ export default class CommandService extends Service {
127
158
}
128
159
}
129
160
}
161
+ finishedProcessingCommands ! ( ) ;
130
162
}
131
163
132
164
get commandContext ( ) : CommandContext {
0 commit comments