@@ -55,7 +55,7 @@ type MongoShell struct {
55
55
ProcessOutBuf []byte
56
56
OutBuf []byte
57
57
Cmd string
58
- Chan chan []byte
58
+ BufChan chan []byte
59
59
Pid int
60
60
StopChan chan struct {}
61
61
MongoHost MongoHost
@@ -70,7 +70,7 @@ func NewMongoShellFromParm(p *QueryParams) *MongoShell {
70
70
setName = p .SetName
71
71
}
72
72
return & MongoShell {
73
- Chan : make (chan []byte , 102400 ),
73
+ BufChan : make (chan []byte , 2 ),
74
74
StopChan : make (chan struct {}, 1 ),
75
75
MongoVersion : p .Version ,
76
76
MongoHost : MongoHost {
@@ -119,11 +119,14 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
119
119
return nil , fmt .Errorf ("invalid version string" )
120
120
}
121
121
122
- r .ShellBin = "mongosh"
123
122
// 4.2 之前的版本,使用 mongo
124
123
isLowerVersion := major < 4 || (minor < 2 && major == 4 )
124
+ isLowerVersion = true // 高版本打算用mongosh的,但搭配mongosh跑不起来
125
+
125
126
if isLowerVersion {
126
127
r .ShellBin = "mongo"
128
+ } else {
129
+ r .ShellBin = "mongosh" //
127
130
}
128
131
129
132
evalJs := ""
@@ -149,6 +152,41 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
149
152
return argv , nil
150
153
}
151
154
155
+ func pumpStdout (outr * os.File , bufChan chan []byte , procCtx context.Context , wg * sync.WaitGroup , logger * slog.Logger ) {
156
+ // pumpStdout
157
+ // 从 outr 读取数据,发送到 bufChan
158
+ logger .Info ("pumpStdout: always read from outr, and send to BufChan" )
159
+ defer wg .Done ()
160
+ var buf = make ([]byte , 102400 )
161
+ var err error
162
+ for {
163
+ select {
164
+ case <- procCtx .Done ():
165
+ logger .Info ("pumpStdout stop, because procCtx.Done" )
166
+ goto done
167
+ default :
168
+ // ctx, _ := context.WithTimeout(context.Background(), 1*time.Second
169
+ // 阻塞读取 outr
170
+ n , readErr := outr .Read (buf )
171
+ logger .Info ("readMsg" , slog .Int ("n" , n ), slog .String ("buf" , string (buf [:n ])),
172
+ slog .Any ("err" , readErr ),
173
+ )
174
+ if n > 0 {
175
+ bufChan <- buf [:n ]
176
+ }
177
+ if err != nil {
178
+ logger .Error ("outr.Read" , slog .Any ("err" , readErr ))
179
+ goto done
180
+ }
181
+ }
182
+ }
183
+ done:
184
+
185
+ logger .Info ("close chan" , slog .String ("func" , "pumpStdout" ))
186
+ close (bufChan )
187
+
188
+ }
189
+
152
190
// Run starts the MongoShell process.
153
191
// 如果返回Error,表示进程启动失败,startWg.Done() 不会被调用
154
192
func (r * MongoShell ) Run (startWg * sync.WaitGroup , logger * slog.Logger ) error {
@@ -189,8 +227,8 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
189
227
190
228
}
191
229
192
- // 启动进程,启动后,将进程的Pid出发送到 Chan
193
- // 如果进程退出,关闭 Chan
230
+ // 启动进程,启动后,将进程的Pid出发送到 BufChan
231
+ // 如果进程退出,关闭 BufChan
194
232
pidChan := make (chan int )
195
233
procCtx , procCancel := context .WithCancel (context .Background ())
196
234
@@ -209,7 +247,7 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
209
247
r .logger .Error ("os.StartProcess" , slog .Any ("err" , err ))
210
248
}
211
249
pidChan <- proc .Pid
212
- // 等待进程结束, 进程结束后,关闭 Chan
250
+ // 等待进程结束, 进程结束后,关闭 BufChan
213
251
state , err := proc .Wait ()
214
252
r .logger .Info ("proc.exited" , slog .String ("state" , state .String ()), slog .Any ("err" , err ))
215
253
@@ -228,85 +266,96 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
228
266
pid = <- pidChan
229
267
r .Pid = pid
230
268
time .Sleep (2 )
231
- r .logger .Info ("startProcess" , slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
269
+ r .logger .Info ("startProcess" ,
270
+ slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
232
271
slog .Int ("pid" , r .Pid ), slog .Any ("err" , err ))
233
272
startWg .Done () // signal to main goroutine
234
273
235
274
wg := sync.WaitGroup {}
236
275
wg .Add (1 )
237
276
go func () {
238
- // pumpStdout
239
- // 从 outr 读取数据,发送到 Chan
240
- // 如果进程结束,关闭 Chan
241
- r .logger .Info ("pumpStdout: always read from outr, and send to Chan" )
277
+ // read outr -> write BufChan
278
+ r .logger .Info ("pumpStdout: always read from outr, and send to BufChan" )
242
279
defer wg .Done ()
243
- var buf = make ([]byte , 102400 )
280
+ var buf = make ([]byte , 1024 )
244
281
for {
245
282
select {
246
283
case <- procCtx .Done ():
247
284
r .logger .Info ("pumpStdout stop, because procCtx.Done" )
248
- // r.Chan <- []byte("exit\n")
285
+ // r.BufChan <- []byte("exit\n")
249
286
goto done
250
287
default :
251
- // ctx, _ := context.WithTimeout(context.Background(), 1*time.Second
252
288
// 阻塞读取 outr
253
289
n , readErr := outr .Read (buf )
254
- r .logger .Info ("readMsg " ,
255
- slog .Int ("readByte " , n ),
256
- slog .String ("buf " , string (buf [:n ])),
290
+ r .logger .Info ("readFromOutr " ,
291
+ slog .Int ("n " , n ),
292
+ slog .String ("data " , string (buf [:n ])),
257
293
slog .Any ("err" , readErr ),
258
294
)
259
295
if err != nil {
260
296
r .logger .Error ("outr.Read" , slog .Any ("err" , readErr ))
261
297
goto done
262
298
}
263
299
if n > 0 {
264
- r .Chan <- buf [:n ]
300
+ // 发送到 BufChan
301
+ r .logger .Info ("sendToBufChan" , slog .Int ("n" , n ),
302
+ slog .String ("data" , string (buf [:n ])))
303
+ var tmpBuf = make ([]byte , n )
304
+ copy (tmpBuf , buf [:n ])
305
+ r .BufChan <- tmpBuf
265
306
}
266
307
}
267
308
}
268
309
done:
269
310
270
311
r .logger .Info ("close chan" , slog .String ("func" , "pumpStdout" ))
271
- close (r .Chan )
312
+ close (r .BufChan )
272
313
}()
273
314
274
- r .logger .Info ("pumpStdout is running" )
275
315
wg .Wait ()
276
316
r .logger .Info ("pumpStdout is done" )
277
317
return nil
278
318
}
279
319
280
320
func (r * MongoShell ) ReceiveMsg (timeout int64 ) (out []byte , err error ) {
281
- buf := make ([]byte , 0 , 32 * 1024 * 1024 )
321
+ maxRespSize := 10 * 1024 * 1024 // 32M
322
+ buf := make ([]byte , 0 , maxRespSize )
282
323
msg := bytes .NewBuffer (buf )
283
- lastUpdate := time .Now ().UnixMilli ()
284
324
ctxTimeout , _ := context .WithTimeout (context .Background (), time .Duration (timeout )* time .Second )
285
325
checkCone := time .Tick (100 * time .Millisecond )
326
+ checkConeCount := 0
327
+ bytesTotal := 0
286
328
for {
287
329
select {
288
- case v , ok := <- r .Chan :
330
+ case v , ok := <- r .BufChan :
331
+ r .logger .Info ("readFromBufChan" , slog .Bool ("isResponseEnd" , isResponseEnd (v )),
332
+ slog .String ("v" , string (v )))
333
+
289
334
if ! ok {
290
335
r .logger .Info ("chan closed" , slog .String ("v" , string (v )))
291
336
return msg .Bytes (), fmt .Errorf ("chan closed" )
292
337
}
293
- _ , werr := msg .Write (v )
338
+
339
+ n , werr := msg .Write (v )
340
+ bytesTotal += n
341
+ // 超过了bufSize
294
342
if werr != nil {
295
343
return msg .Bytes (), werr
296
344
}
297
345
298
- lastUpdate = time .Now ().UnixMilli ()
299
- if isResponseEnd (msg .Bytes ()) {
300
- r .logger .Info ("response end" , slog .String ("v" , string (v )))
301
- return msg .Bytes (), nil
302
- } else {
303
- r .logger .Info ("receive msg" , slog .String ("v" , string (v )))
346
+ if bytesTotal > maxRespSize {
347
+ r .logger .Info ("excess data size" , slog .Int ("bytesTotal" , bytesTotal ))
348
+ return nil , fmt .Errorf ("excess data size" )
304
349
}
350
+
351
+ checkConeCount = 0
352
+
305
353
case <- ctxTimeout .Done ():
306
354
return msg .Bytes (), fmt .Errorf ("timeout" ) // 返回超时或取消原因
307
355
case <- checkCone :
308
- if msg .Len () > 0 && time .Now ().UnixMilli ()- lastUpdate > 800 {
309
- r .logger .Info ("ReceiveMsg because of timeout" , slog .Int ("msgLen" , msg .Len ()))
356
+ checkConeCount += 1
357
+ if msg .Len () > 0 && checkConeCount > 4 {
358
+ r .logger .Info ("timeout" , slog .Int ("msgLen" , msg .Len ()))
310
359
return msg .Bytes (), nil
311
360
}
312
361
}
@@ -321,9 +370,9 @@ func (r *MongoShell) Stop() {
321
370
r .logger .Info ("stopped" )
322
371
}
323
372
324
- func ( r * MongoShell ) precheckInput ( msg []byte ) ([]byte , error ) {
373
+ func precheckInput ( ShellBin string , msg []byte ) ([]byte , error ) {
325
374
// 如果是 mongosh,不需要加 print
326
- if r . ShellBin == "mongosh" {
375
+ if ShellBin == "mongosh" {
327
376
return msg , nil
328
377
}
329
378
@@ -348,7 +397,7 @@ func (r *MongoShell) precheckInput(msg []byte) ([]byte, error) {
348
397
349
398
// SendMsg sends a message to process
350
399
func (r * MongoShell ) SendMsg (msg []byte ) (n int , err error ) {
351
- msg , err = r . precheckInput (msg )
400
+ msg , err = precheckInput (r . ShellBin , msg )
352
401
if err != nil {
353
402
return 0 , errors .Wrap (err , "precheckInput" )
354
403
}
0 commit comments