@@ -55,7 +55,7 @@ type MongoShell struct {
55
55
ProcessOutBuf []byte
56
56
OutBuf []byte
57
57
Cmd string
58
- Chan chan []byte
58
+ BufChan chan []byte
59
59
Pid int
60
60
StopChan chan struct {}
61
61
MongoHost MongoHost
@@ -70,7 +70,7 @@ func NewMongoShellFromParm(p *QueryParams) *MongoShell {
70
70
setName = p .SetName
71
71
}
72
72
return & MongoShell {
73
- Chan : make (chan []byte , 102400 ),
73
+ BufChan : make (chan []byte , 2 ),
74
74
StopChan : make (chan struct {}, 1 ),
75
75
MongoVersion : p .Version ,
76
76
MongoHost : MongoHost {
@@ -119,11 +119,14 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
119
119
return nil , fmt .Errorf ("invalid version string" )
120
120
}
121
121
122
- r .ShellBin = "mongosh"
123
122
// 4.2 之前的版本,使用 mongo
124
123
isLowerVersion := major < 4 || (minor < 2 && major == 4 )
124
+ isLowerVersion = true // 高版本打算用mongosh的,但搭配mongosh跑不起来
125
+
125
126
if isLowerVersion {
126
127
r .ShellBin = "mongo"
128
+ } else {
129
+ r .ShellBin = "mongosh" //
127
130
}
128
131
129
132
evalJs := ""
@@ -149,6 +152,41 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
149
152
return argv , nil
150
153
}
151
154
155
+ func pumpStdout (outr * os.File , bufChan chan []byte , procCtx context.Context , wg * sync.WaitGroup , logger * slog.Logger ) {
156
+ // pumpStdout
157
+ // 从 outr 读取数据,发送到 bufChan
158
+ logger .Info ("pumpStdout: always read from outr, and send to BufChan" )
159
+ defer wg .Done ()
160
+ var buf = make ([]byte , 102400 )
161
+ var err error
162
+ for {
163
+ select {
164
+ case <- procCtx .Done ():
165
+ logger .Info ("pumpStdout stop, because procCtx.Done" )
166
+ goto done
167
+ default :
168
+ // ctx, _ := context.WithTimeout(context.Background(), 1*time.Second
169
+ // 阻塞读取 outr
170
+ n , readErr := outr .Read (buf )
171
+ logger .Info ("readMsg" , slog .Int ("n" , n ), slog .String ("buf" , string (buf [:n ])),
172
+ slog .Any ("err" , readErr ),
173
+ )
174
+ if n > 0 {
175
+ bufChan <- buf [:n ]
176
+ }
177
+ if err != nil {
178
+ logger .Error ("outr.Read" , slog .Any ("err" , readErr ))
179
+ goto done
180
+ }
181
+ }
182
+ }
183
+ done:
184
+
185
+ logger .Info ("close chan" , slog .String ("func" , "pumpStdout" ))
186
+ close (bufChan )
187
+
188
+ }
189
+
152
190
// Run starts the MongoShell process.
153
191
// 如果返回Error,表示进程启动失败,startWg.Done() 不会被调用
154
192
func (r * MongoShell ) Run (startWg * sync.WaitGroup , logger * slog.Logger ) error {
@@ -189,8 +227,8 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
189
227
190
228
}
191
229
192
- // 启动进程,启动后,将进程的Pid出发送到 Chan
193
- // 如果进程退出,关闭 Chan
230
+ // 启动进程,启动后,将进程的Pid出发送到 BufChan
231
+ // 如果进程退出,关闭 BufChan
194
232
pidChan := make (chan int )
195
233
procCtx , procCancel := context .WithCancel (context .Background ())
196
234
@@ -209,7 +247,7 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
209
247
r .logger .Error ("os.StartProcess" , slog .Any ("err" , err ))
210
248
}
211
249
pidChan <- proc .Pid
212
- // 等待进程结束, 进程结束后,关闭 Chan
250
+ // 等待进程结束, 进程结束后,关闭 BufChan
213
251
state , err := proc .Wait ()
214
252
r .logger .Info ("proc.exited" , slog .String ("state" , state .String ()), slog .Any ("err" , err ))
215
253
@@ -228,85 +266,99 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
228
266
pid = <- pidChan
229
267
r .Pid = pid
230
268
time .Sleep (2 )
231
- r .logger .Info ("startProcess" , slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
269
+ r .logger .Info ("startProcess" ,
270
+ slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
232
271
slog .Int ("pid" , r .Pid ), slog .Any ("err" , err ))
233
272
startWg .Done () // signal to main goroutine
234
273
235
274
wg := sync.WaitGroup {}
236
275
wg .Add (1 )
237
276
go func () {
238
277
// pumpStdout
239
- // 从 outr 读取数据,发送到 Chan
240
- // 如果进程结束,关闭 Chan
241
- r .logger .Info ("pumpStdout: always read from outr, and send to Chan " )
278
+ // 从 outr 读取数据,发送到 BufChan
279
+ // 如果进程结束,关闭 BufChan
280
+ r .logger .Info ("pumpStdout: always read from outr, and send to BufChan " )
242
281
defer wg .Done ()
243
- var buf = make ([]byte , 102400 )
282
+ var buf = make ([]byte , 1024 )
244
283
for {
245
284
select {
246
285
case <- procCtx .Done ():
247
286
r .logger .Info ("pumpStdout stop, because procCtx.Done" )
248
- // r.Chan <- []byte("exit\n")
287
+ // r.BufChan <- []byte("exit\n")
249
288
goto done
250
289
default :
251
- // ctx, _ := context.WithTimeout(context.Background(), 1*time.Second
252
290
// 阻塞读取 outr
253
291
n , readErr := outr .Read (buf )
254
- r .logger .Info ("readMsg " ,
255
- slog .Int ("readByte " , n ),
256
- slog .String ("buf " , string (buf [:n ])),
292
+ r .logger .Info ("readFromOutr " ,
293
+ slog .Int ("n " , n ),
294
+ slog .String ("data " , string (buf [:n ])),
257
295
slog .Any ("err" , readErr ),
258
296
)
259
297
if err != nil {
260
298
r .logger .Error ("outr.Read" , slog .Any ("err" , readErr ))
261
299
goto done
262
300
}
263
301
if n > 0 {
264
- r .Chan <- buf [:n ]
302
+ // 发送到 BufChan
303
+ r .logger .Info ("sendToBufChan" , slog .Int ("n" , n ),
304
+ slog .String ("data" , string (buf [:n ])))
305
+ var tmpBuf = make ([]byte , n )
306
+ copy (tmpBuf , buf [:n ])
307
+ r .BufChan <- tmpBuf
308
+
265
309
}
266
310
}
267
311
}
268
312
done:
269
313
270
314
r .logger .Info ("close chan" , slog .String ("func" , "pumpStdout" ))
271
- close (r .Chan )
315
+ close (r .BufChan )
272
316
}()
273
317
274
- r .logger .Info ("pumpStdout is running" )
275
318
wg .Wait ()
276
319
r .logger .Info ("pumpStdout is done" )
277
320
return nil
278
321
}
279
322
280
323
func (r * MongoShell ) ReceiveMsg (timeout int64 ) (out []byte , err error ) {
281
- buf := make ([]byte , 0 , 32 * 1024 * 1024 )
324
+ maxRespSize := 10 * 1024 * 1024 // 32M
325
+ buf := make ([]byte , 0 , maxRespSize )
282
326
msg := bytes .NewBuffer (buf )
283
- lastUpdate := time .Now ().UnixMilli ()
284
327
ctxTimeout , _ := context .WithTimeout (context .Background (), time .Duration (timeout )* time .Second )
285
328
checkCone := time .Tick (100 * time .Millisecond )
329
+ checkConeCount := 0
330
+ bytesTotal := 0
286
331
for {
287
332
select {
288
- case v , ok := <- r .Chan :
333
+ case v , ok := <- r .BufChan :
334
+ r .logger .Info ("readFromBufChan" , slog .Bool ("isResponseEnd" , isResponseEnd (v )),
335
+ slog .String ("v" , string (v )))
336
+
289
337
if ! ok {
290
338
r .logger .Info ("chan closed" , slog .String ("v" , string (v )))
291
339
return msg .Bytes (), fmt .Errorf ("chan closed" )
292
340
}
293
- _ , werr := msg .Write (v )
341
+
342
+ n , werr := msg .Write (v )
343
+ bytesTotal += n
344
+ // 超过了bufSize
294
345
if werr != nil {
295
346
return msg .Bytes (), werr
296
347
}
297
348
298
- lastUpdate = time .Now ().UnixMilli ()
299
- if isResponseEnd (msg .Bytes ()) {
300
- r .logger .Info ("response end" , slog .String ("v" , string (v )))
301
- return msg .Bytes (), nil
302
- } else {
303
- r .logger .Info ("receive msg" , slog .String ("v" , string (v )))
349
+ if bytesTotal > maxRespSize {
350
+ r .logger .Info ("excess data size" , slog .Int ("bytesTotal" , bytesTotal ))
351
+ return nil , fmt .Errorf ("excess data size" )
304
352
}
353
+
354
+ checkConeCount = 0
355
+
305
356
case <- ctxTimeout .Done ():
306
357
return msg .Bytes (), fmt .Errorf ("timeout" ) // 返回超时或取消原因
307
358
case <- checkCone :
308
- if msg .Len () > 0 && time .Now ().UnixMilli ()- lastUpdate > 800 {
309
- r .logger .Info ("ReceiveMsg because of timeout" , slog .Int ("msgLen" , msg .Len ()))
359
+ checkConeCount += 1
360
+ if msg .Len () > 0 && checkConeCount > 4 {
361
+ r .logger .Info ("timeout" , slog .Int ("msgLen" , msg .Len ()))
310
362
return msg .Bytes (), nil
311
363
}
312
364
}
@@ -321,9 +373,9 @@ func (r *MongoShell) Stop() {
321
373
r .logger .Info ("stopped" )
322
374
}
323
375
324
- func ( r * MongoShell ) precheckInput ( msg []byte ) ([]byte , error ) {
376
+ func precheckInput ( ShellBin string , msg []byte ) ([]byte , error ) {
325
377
// 如果是 mongosh,不需要加 print
326
- if r . ShellBin == "mongosh" {
378
+ if ShellBin == "mongosh" {
327
379
return msg , nil
328
380
}
329
381
@@ -348,7 +400,7 @@ func (r *MongoShell) precheckInput(msg []byte) ([]byte, error) {
348
400
349
401
// SendMsg sends a message to process
350
402
func (r * MongoShell ) SendMsg (msg []byte ) (n int , err error ) {
351
- msg , err = r . precheckInput (msg )
403
+ msg , err = precheckInput (r . ShellBin , msg )
352
404
if err != nil {
353
405
return 0 , errors .Wrap (err , "precheckInput" )
354
406
}
0 commit comments