@@ -55,11 +55,12 @@ type MongoShell struct {
55
55
ProcessOutBuf []byte
56
56
OutBuf []byte
57
57
Cmd string
58
- Chan chan []byte
58
+ BufChan chan []byte
59
59
Pid int
60
60
StopChan chan struct {}
61
61
MongoHost MongoHost
62
62
MongoVersion string
63
+ ShellBin string // mongo or mongosh
63
64
}
64
65
65
66
// NewMongoShellFromParm create a new MongoShell instance
@@ -69,7 +70,7 @@ func NewMongoShellFromParm(p *QueryParams) *MongoShell {
69
70
setName = p .SetName
70
71
}
71
72
return & MongoShell {
72
- Chan : make (chan []byte , 102400 ),
73
+ BufChan : make (chan []byte , 2 ),
73
74
StopChan : make (chan struct {}, 1 ),
74
75
MongoVersion : p .Version ,
75
76
MongoHost : MongoHost {
@@ -118,11 +119,14 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
118
119
return nil , fmt .Errorf ("invalid version string" )
119
120
}
120
121
121
- shellPath := "mongosh"
122
122
// 4.2 之前的版本,使用 mongo
123
123
isLowerVersion := major < 4 || (minor < 2 && major == 4 )
124
+ isLowerVersion = true // 高版本打算用mongosh的,但搭配mongosh跑不起来. 就先用mongo
125
+
124
126
if isLowerVersion {
125
- shellPath = "mongo"
127
+ r .ShellBin = "mongo"
128
+ } else {
129
+ r .ShellBin = "mongosh"
126
130
}
127
131
128
132
evalJs := ""
@@ -140,7 +144,7 @@ func buildArgs(r *MongoShell) (argv []string, err error) {
140
144
}
141
145
}
142
146
143
- cmdPath , err := exec .LookPath (shellPath )
147
+ cmdPath , err := exec .LookPath (r . ShellBin )
144
148
if err != nil {
145
149
return nil , fmt .Errorf ("internal error, exec.LookPath failed" )
146
150
}
@@ -188,8 +192,8 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
188
192
189
193
}
190
194
191
- // 启动进程,启动后,将进程的Pid出发送到 Chan
192
- // 如果进程退出,关闭 Chan
195
+ // 启动进程,启动后,将进程的Pid出发送到 BufChan
196
+ // 如果进程退出,关闭 BufChan
193
197
pidChan := make (chan int )
194
198
procCtx , procCancel := context .WithCancel (context .Background ())
195
199
@@ -208,7 +212,7 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
208
212
r .logger .Error ("os.StartProcess" , slog .Any ("err" , err ))
209
213
}
210
214
pidChan <- proc .Pid
211
- // 等待进程结束, 进程结束后,关闭 Chan
215
+ // 等待进程结束, 进程结束后,关闭 BufChan
212
216
state , err := proc .Wait ()
213
217
r .logger .Info ("proc.exited" , slog .String ("state" , state .String ()), slog .Any ("err" , err ))
214
218
@@ -227,82 +231,93 @@ func (r *MongoShell) Run(startWg *sync.WaitGroup, logger *slog.Logger) error {
227
231
pid = <- pidChan
228
232
r .Pid = pid
229
233
time .Sleep (2 )
230
- r .logger .Info ("startProcess" , slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
234
+ r .logger .Info ("startProcess" ,
235
+ slog .String ("cmdPath" , argv [0 ]), slog .Any ("argv" , argv ),
231
236
slog .Int ("pid" , r .Pid ), slog .Any ("err" , err ))
232
237
startWg .Done () // signal to main goroutine
233
238
234
239
wg := sync.WaitGroup {}
235
240
wg .Add (1 )
236
241
go func () {
237
- // pumpStdout
238
- // 从 outr 读取数据,发送到 Chan
239
- // 如果进程结束,关闭 Chan
240
- r .logger .Info ("pumpStdout: always read from outr, and send to Chan" )
242
+ // read outr -> write BufChan
243
+ r .logger .Info ("pumpStdout: always read from outr, and send to BufChan" )
241
244
defer wg .Done ()
242
- var buf = make ([]byte , 102400 )
245
+ var buf = make ([]byte , 1024 )
243
246
for {
244
247
select {
245
248
case <- procCtx .Done ():
246
249
r .logger .Info ("pumpStdout stop, because procCtx.Done" )
247
- // r.Chan <- []byte("exit\n")
250
+ // r.BufChan <- []byte("exit\n")
248
251
goto done
249
252
default :
250
- // ctx, _ := context.WithTimeout(context.Background(), 1*time.Second
251
253
// 阻塞读取 outr
252
254
n , readErr := outr .Read (buf )
253
- r .logger .Info ("readMsg " ,
254
- slog .Int ("readByte " , n ),
255
- slog .String ("buf " , string (buf [:n ])),
255
+ r .logger .Info ("readFromOutr " ,
256
+ slog .Int ("n " , n ),
257
+ slog .String ("data " , string (buf [:n ])),
256
258
slog .Any ("err" , readErr ),
257
259
)
258
260
if err != nil {
259
261
r .logger .Error ("outr.Read" , slog .Any ("err" , readErr ))
260
262
goto done
261
263
}
262
264
if n > 0 {
263
- r .Chan <- buf [:n ]
265
+ // 发送到 BufChan
266
+ r .logger .Info ("sendToBufChan" , slog .Int ("n" , n ),
267
+ slog .String ("data" , string (buf [:n ])))
268
+ var tmpBuf = make ([]byte , n )
269
+ copy (tmpBuf , buf [:n ])
270
+ r .BufChan <- tmpBuf
264
271
}
265
272
}
266
273
}
267
274
done:
268
275
269
276
r .logger .Info ("close chan" , slog .String ("func" , "pumpStdout" ))
270
- close (r .Chan )
277
+ close (r .BufChan )
271
278
}()
272
279
273
- r .logger .Info ("pumpStdout is running" )
274
280
wg .Wait ()
275
281
r .logger .Info ("pumpStdout is done" )
276
282
return nil
277
283
}
278
284
285
+ // ReceiveMsg receives a message from the process
279
286
func (r * MongoShell ) ReceiveMsg (timeout int64 ) (out []byte , err error ) {
280
- buf := make ([]byte , 0 , 32 * 1024 * 1024 )
287
+ buf := make ([]byte , 0 , maxRespSize )
281
288
msg := bytes .NewBuffer (buf )
282
- lastUpdate := time .Now ().UnixMilli ()
283
289
ctxTimeout , _ := context .WithTimeout (context .Background (), time .Duration (timeout )* time .Second )
284
290
checkCone := time .Tick (100 * time .Millisecond )
291
+ checkConeCount := 0
292
+ bytesTotal := 0
285
293
for {
286
294
select {
287
- case v , ok := <- r .Chan :
295
+ case v , ok := <- r .BufChan :
296
+ r .logger .Info ("readFromBufChan" , slog .Bool ("isResponseEnd" , isResponseEnd (v )),
297
+ slog .String ("v" , string (v )))
298
+
288
299
if ! ok {
289
300
r .logger .Info ("chan closed" , slog .String ("v" , string (v )))
290
301
return msg .Bytes (), fmt .Errorf ("chan closed" )
291
302
}
292
- _ , werr := msg .Write (v )
303
+ n , werr := msg .Write (v )
304
+ bytesTotal += n
305
+ // 超过了bufSize
293
306
if werr != nil {
294
307
return msg .Bytes (), werr
295
308
}
296
-
297
- lastUpdate = time .Now ().UnixMilli ()
298
- if isResponseEnd (msg .Bytes ()) {
299
- return msg .Bytes (), nil
309
+ if bytesTotal > maxRespSize {
310
+ r .logger .Info ("excess data size" , slog .Int ("bytesTotal" , bytesTotal ))
311
+ return nil , fmt .Errorf ("excess data size" )
300
312
}
313
+ checkConeCount = 0
314
+
301
315
case <- ctxTimeout .Done ():
302
316
return msg .Bytes (), fmt .Errorf ("timeout" ) // 返回超时或取消原因
303
317
case <- checkCone :
304
- if msg .Len () > 0 && time .Now ().UnixMilli ()- lastUpdate > 800 {
305
- r .logger .Info ("ReceiveMsg because of timeout" , slog .Int ("msgLen" , msg .Len ()))
318
+ checkConeCount += 1
319
+ if msg .Len () > 0 && checkConeCount > 4 {
320
+ r .logger .Info ("timeout" , slog .Int ("msgLen" , msg .Len ()))
306
321
return msg .Bytes (), nil
307
322
}
308
323
}
@@ -317,7 +332,12 @@ func (r *MongoShell) Stop() {
317
332
r .logger .Info ("stopped" )
318
333
}
319
334
320
- func precheckInput (msg []byte ) ([]byte , error ) {
335
+ func precheckInput (ShellBin string , msg []byte ) ([]byte , error ) {
336
+ // 如果是 mongosh,不需要加 print
337
+ if ShellBin == "mongosh" {
338
+ return msg , nil
339
+ }
340
+
321
341
// append "\n" to the end of msg
322
342
if len (msg ) == 0 || msg [len (msg )- 1 ] != '\n' {
323
343
msg = append (msg , []byte ("\n " )... )
@@ -330,7 +350,7 @@ func precheckInput(msg []byte) ([]byte, error) {
330
350
if reIt .Match (msg ) || reUse .Match (msg ) {
331
351
// use xxx
332
352
// it;
333
- // 一定会有返回,不需要加 print, 其它的就不一定了.
353
+ // 一定会有返回,不需要加 print, 其它的可能没有返回
334
354
} else {
335
355
msg = append (msg , []byte ("print('')\n " )... )
336
356
}
@@ -340,7 +360,7 @@ func precheckInput(msg []byte) ([]byte, error) {
340
360
341
361
// SendMsg sends a message to process
342
362
func (r * MongoShell ) SendMsg (msg []byte ) (n int , err error ) {
343
- msg , err = precheckInput (msg )
363
+ msg , err = precheckInput (r . ShellBin , msg )
344
364
if err != nil {
345
365
return 0 , errors .Wrap (err , "precheckInput" )
346
366
}
0 commit comments