Skip to content

Commit 42e9a9b

Browse files
authored
Merge pull request #20 from antlabs/dev
Dev
2 parents eb25f21 + 3626e32 commit 42e9a9b

19 files changed

+393
-195
lines changed

common_options.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,28 +283,46 @@ func WithClientOnCloseFunc(onClose func(c *Conn, err error)) ClientOption {
283283
// 18.1 配置服务端Callback相关方法在io event loop中执行
284284
func WithServerCallbackInEventLoop() ServerOption {
285285
return func(o *ConnOption) {
286-
o.callbackInEventLoop = true
286+
o.runInGoTask = "io"
287287
}
288288
}
289289

290290
// 18.2 配置服务端Callback相关方法在io event loop中执行
291291
func WithClientCallbackInEventLoop() ClientOption {
292292
return func(o *DialOption) {
293-
o.callbackInEventLoop = true
293+
o.runInGoTask = "io"
294294
}
295295
}
296296

297297
// 19.1 配置服务端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式
298298
func WithServerStreamMode() ServerOption {
299299
return func(o *ConnOption) {
300-
o.runInGoStrategy = taskStrategyStream
300+
o.runInGoTask = "stream"
301301
}
302302
}
303303

304304
// 19.2 配置客户端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式
305305
func WithClientStreamMode() ClientOption {
306306
return func(o *DialOption) {
307-
o.runInGoStrategy = taskStrategyStream
307+
o.runInGoTask = "stream"
308+
}
309+
}
310+
311+
// 20.1 配置自定义task, 需要确保传入的值是有效的,不然会panic
312+
func WithServerCustomTaskMode(taskName string) ServerOption {
313+
return func(o *ConnOption) {
314+
if len(taskName) > 0 {
315+
o.runInGoTask = taskName
316+
}
317+
}
318+
}
319+
320+
// 20.2 配置自定义task, 需要确保传入的值是有效的,不然会panic
321+
func WithClientCustomTaskMode(taskName string) ClientOption {
322+
return func(o *DialOption) {
323+
if len(taskName) > 0 {
324+
o.runInGoTask = taskName
325+
}
308326
}
309327
}
310328

config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ type Config struct {
3636
maxDelayWriteDuration time.Duration // 最大延迟时间, 默认值是10ms
3737
subProtocols []string // 设置支持的子协议
3838
multiEventLoop *MultiEventLoop // 事件循环
39-
callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数
40-
runInGoStrategy taskStrategy //运行业务OnMessage的策略
39+
runInGoTask string // 运行业务OnMessage的策略, 现在greatws集成三种OnMessage运行模式,分别是io, task
4140
}
4241

4342
// func (c *Config) useIoUring() bool {
@@ -55,8 +54,9 @@ func (c *Config) defaultSetting() {
5554
c.windowsMultipleTimesPayloadSize = 1.0
5655
c.delayWriteInitBufferSize = 8 * 1024
5756
c.maxDelayWriteDuration = 10 * time.Millisecond
58-
c.runInGoStrategy = taskStrategyBind
57+
// c.runInGoStrategy = taskStrategyBind
5958
c.tcpNoDelay = true
6059
// 对于text消息,默认不检查text是utf8字符
6160
c.utf8Check = func(b []byte) bool { return true }
61+
c.runInGoTask = "unstream"
6262
}

conn_core.go

Lines changed: 13 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"math/rand"
2424
"sync/atomic"
2525
"time"
26-
"unsafe"
2726

2827
"github.com/antlabs/wsutil/bytespool"
2928
"github.com/antlabs/wsutil/enum"
@@ -63,14 +62,14 @@ const (
6362
type conn struct {
6463
fd int64 // 文件描述符fd
6564
rbuf *[]byte // 读缓冲区
66-
rr int // rbuf读索引
67-
rw int // rbuf写索引
65+
rr int // rbuf读索引,rfc标准里面有超过4个字节的大包,所以索引只能用int类型
66+
rw int // rbuf写索引,rfc标准里面有超过4个字节的大包,所以索引只能用int类型
67+
wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区
6868
lenAndMaskSize int // payload长度和掩码的长度
69-
lastPayloadLen int64 // 上一次读取的payload长度
7069
rh frame.FrameHeader // frame头部
7170
fragmentFramePayload *[]byte // 存放分片帧的缓冲区
7271
fragmentFrameHeader *frame.FrameHeader // 存放分段帧的头部
73-
closed int32 // 是否关闭
72+
lastPayloadLen int32 // 上一次读取的payload长度, TODO启用
7473
curState frameState // 保存当前状态机的状态
7574
client bool // 客户端为true,服务端为false
7675
}
@@ -79,56 +78,14 @@ func (c *Conn) getLogger() *slog.Logger {
7978
return c.multiEventLoop.Logger
8079
}
8180

82-
// 获取当前绑定的go程
83-
func (c *Conn) getCurrBindGo() *businessGo {
84-
return (*businessGo)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.currBindGo))))
85-
}
86-
87-
// 设置当前绑定的go程
88-
func (c *Conn) setCurrBindGo(b *businessGo) {
89-
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&c.currBindGo)), unsafe.Pointer(b))
90-
}
91-
92-
// addTask 进行任务调度, 任务调试分为三种模式, 1. stream模式, 2. go模式, 3. io模式
93-
func (c *Conn) addTask(ts taskStrategy, f func() bool) {
81+
func (c *Conn) addTask(f func() bool) {
9482
if c.isClosed() {
9583
return
9684
}
9785

98-
if c.callbackInEventLoop {
99-
c.multiEventLoop.runInIo.addTask(ts, f)
100-
return
101-
}
102-
if ts == taskStrategyStream {
103-
c.streamGo.addTask(ts, f)
104-
return
105-
}
106-
107-
var err error
108-
retry := 2
109-
if c.parent.localTask.isFull() {
110-
retry = 1
111-
}
112-
113-
for i := 0; i < retry; i++ {
114-
err = c.parent.localTask.addTask(c, ts, f)
115-
if err == nil || retry == 1 {
116-
break
117-
}
118-
if err == ErrTaskQueueFull {
119-
if c.parent.localTask.addGoWithSteal(c.getCurrBindGo()) {
120-
c.parent.localTask.rebindGo(c)
121-
}
122-
continue
123-
}
124-
125-
}
126-
127-
if err == ErrTaskQueueFull {
128-
// 负载高走自己专用chan
129-
c.getCurrBindGo().taskChan <- f
130-
// 负载低走公共chan, TODO, 需要找一个判断高/低雷负载的条件
131-
// c.parent.localTask.public <- f
86+
err := c.task.AddTask(f)
87+
if err != nil {
88+
c.getLogger().Error("addTask", "err", err.Error())
13289
}
13390

13491
}
@@ -298,7 +255,7 @@ func (c *Conn) readPayload() (f frame.Frame2, success bool, err error) {
298255
if needRead > 0 {
299256
return
300257
}
301-
c.lastPayloadLen = c.rh.PayloadLen
258+
c.lastPayloadLen = int32(c.rh.PayloadLen)
302259
// 普通frame
303260
newBuf := GetPayloadBytes(int(c.rh.PayloadLen))
304261
copy(*newBuf, (*c.rbuf)[c.rr:c.rr+int(c.rh.PayloadLen)])
@@ -359,7 +316,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) {
359316
c.fragmentFramePayload = nil
360317

361318
// 进入业务协程执行
362-
c.addTask(c.runInGoStrategy, func() (exit bool) {
319+
c.addTask(func() (exit bool) {
363320
if fragmentFrameHeader.GetRsv1() && decompression {
364321
tempBuf, err := decode(*fragmentFramePayload)
365322
if err != nil {
@@ -421,7 +378,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) {
421378
// payloadPtr.Store(f.Payload)
422379

423380
// text或者binary进入业务协程执行
424-
c.addTask(c.runInGoStrategy, func() bool {
381+
c.addTask(func() bool {
425382
return c.processCallbackData(f, payload, rsv1, decompression, needMask, maskKey)
426383
})
427384

@@ -487,8 +444,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) {
487444
// 进入业务协程执行
488445
payload := f.Payload
489446
// here
490-
c.addTask(c.runInGoStrategy, func() bool {
491-
447+
c.addTask(func() bool {
492448
return c.processPing(f, payload)
493449
})
494450
return
@@ -500,7 +456,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) {
500456
}
501457

502458
// 进入业务协程执行
503-
c.addTask(c.runInGoStrategy, func() bool {
459+
c.addTask(func() bool {
504460
c.Callback.OnMessage(c, f.Opcode, nil)
505461
return false
506462
})

conn_unix.go

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828
"unsafe"
2929

30+
"github.com/antlabs/greatws/task/driver"
3031
"github.com/antlabs/wsutil/bytespool"
3132
"github.com/antlabs/wsutil/enum"
3233
"golang.org/x/sys/unix"
@@ -62,17 +63,15 @@ var (
6263
type Conn struct {
6364
conn
6465

65-
wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区
66-
mu sync.Mutex // 锁
67-
*Config // 配置
68-
parent *EventLoop // event loop
69-
currBindGo *businessGo // 绑定模式下,当前绑定的go程
70-
streamGo *taskStream // stream模式下,当前绑定的go程
71-
rtime *time.Timer // 控制读超时
72-
wtime *time.Timer // 控制写超时
73-
closeConnOnce sync.Once // 关闭一次
74-
onCloseOnce Once // 保证只调用一次OnClose函数
75-
66+
mu sync.Mutex // 锁
67+
*Config // 配置
68+
parent *EventLoop // event loop
69+
task driver.TaskExecutor // 任务,该任务会进协程池里面执行
70+
rtime *time.Timer // 控制读超时
71+
wtime *time.Timer // 控制写超时
72+
closeConnOnce sync.Once // 关闭一次
73+
onCloseOnce Once // 保证只调用一次OnClose函数
74+
closed int32 // 是否关闭
7675
}
7776

7877
func newConn(fd int64, client bool, conf *Config) *Conn {
@@ -83,14 +82,10 @@ func newConn(fd int64, client bool, conf *Config) *Conn {
8382
},
8483
// 初始化不分配内存,只有在需要的时候才分配
8584
Config: conf,
86-
8785
parent: conf.multiEventLoop.getEventLoop(int(fd)),
8886
}
8987

90-
if conf.runInGoStrategy == taskStrategyStream {
91-
c.streamGo = newTaskStream()
92-
}
93-
88+
c.task = c.parent.localTask.newTask(conf.runInGoTask)
9489
if conf.readTimeout > 0 {
9590
c.setReadDeadline(time.Now().Add(conf.readTimeout))
9691
}
@@ -113,15 +108,15 @@ func (c *Conn) closeInnerWithOnClose(err error, onClose bool) {
113108
if err != nil {
114109
err = io.EOF
115110
}
116-
switch c.Config.runInGoStrategy {
117-
case taskStrategyBind:
118-
if c.getCurrBindGo() != nil {
119-
c.currBindGo.subBinConnCount()
120-
}
121-
case taskStrategyStream:
122-
c.streamGo.close()
123-
124-
}
111+
// switch c.Config.runInGoStrategy {
112+
// case taskStrategyBind:
113+
// if c.getCurrBindGo() != nil {
114+
// c.currBindGo.subBinConnCount()
115+
// }
116+
// case taskStrategyStream:
117+
// c.streamGo.close()
118+
119+
// }
125120
fd := c.getFd()
126121
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
127122
c.parent.del(c)

event_loop.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type EventLoop struct {
3434
*apiState // 每个平台对应的异步io接口/epoll/kqueue/iouring(暂时不加,除非io-uring性能超过epoll才加回来)
3535
shutdown bool
3636
parent *MultiEventLoop
37-
localTask task
37+
localTask selectTasks
3838
}
3939

4040
// 初始化函数
@@ -44,9 +44,14 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event
4444
maxFd: -1,
4545
parent: parent,
4646
}
47-
e.localTask.taskConfig = e.parent.configTask.taskConfig
48-
e.localTask.taskMode = e.parent.configTask.taskMode
49-
e.localTask.init()
47+
48+
// 初始化任务池
49+
e.localTask = newSelectTask(parent.configTask.initCount, parent.configTask.min, parent.configTask.max)
50+
51+
// TODO+
52+
// e.localTask.taskConfig = e.parent.configTask.taskConfig
53+
// e.localTask.taskMode = e.parent.configTask.taskMode
54+
// e.localTask.init()
5055
err = e.apiCreate(flag)
5156
return e, err
5257
}

multi_event_loops.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,32 @@ import (
1919
"runtime"
2020
"sync/atomic"
2121
"time"
22+
23+
_ "github.com/antlabs/greatws/task/io"
24+
_ "github.com/antlabs/greatws/task/stream"
25+
_ "github.com/antlabs/greatws/task/unstream"
2226
)
2327

28+
type taskConfig struct {
29+
initCount int // 初始化的协程数
30+
min int // 最小协程数
31+
max int // 最大协程数
32+
}
33+
2434
type multiEventLoopOption struct {
2535
numLoops int //起多少个event loop
26-
// 只是配置的作用,不是真正的任务池, fd是绑定到某个事件循环上的,
36+
37+
// 为何不设计全局池, 现在的做法是
38+
// fd是绑定到某个事件循环上的,
2739
// 任务池是绑定到某个事件循环上的,所以这里的任务池也绑定到对应的localTask上
2840
// 如果设计全局任务池,那么概念就会很乱,容易出错,也会临界区竞争
29-
configTask task
30-
taskMode taskMode
41+
configTask taskConfig
42+
// taskMode taskMode
3143
level slog.Level //控制日志等级
3244
maxEventNum int //每次epoll/kqueue返回时,一次最多处理多少事件
3345
parseInParseLoop *bool //在解析循环中运行websocket OnOpen, OnMessage, OnClose 回调函数
46+
47+
selectTask selectTasks // 任务池
3448
}
3549

3650
type MultiEventLoop struct {
@@ -39,8 +53,7 @@ type MultiEventLoop struct {
3953
loops []*EventLoop
4054
parseLoop *taskParse
4155

42-
runInIo taskIo
43-
flag evFlag // 是否使用io_uring,目前没有使用
56+
flag evFlag // 是否使用io_uring,目前没有使用
4457

4558
stat // 统计信息
4659
*slog.Logger
@@ -52,7 +65,7 @@ var (
5265
defMaxEventNum = 256
5366
defTaskMin = 50
5467
defTaskMax = 30000
55-
defTaskInitCount = 1000
68+
defTaskInitCount = 8
5669
defNumLoops = runtime.NumCPU()
5770
)
5871

@@ -120,11 +133,7 @@ func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
120133
o(m)
121134
}
122135
m.initDefaultSetting()
123-
// m.startOk = make(chan struct{}, 1)
124-
// 设置任务池模式(tps, 或者流量模式)
125-
m.configTask.taskMode = m.taskMode
126136

127-
m.configTask.init()
128137
if *m.parseInParseLoop {
129138
m.parseLoop = newTaskParse()
130139
}

multi_event_loops_option.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ func WithBusinessGoNum(initCount, min, max int) EvOption {
5151
}
5252

5353
// 设置business go程池 对流量压测友好的模式
54-
func WithBusinessGoTrafficMode() EvOption {
55-
return func(e *MultiEventLoop) {
56-
e.taskMode = trafficMode
57-
}
58-
}
54+
// func WithBusinessGoTrafficMode() EvOption {
55+
// return func(e *MultiEventLoop) {
56+
// e.taskMode = trafficMode
57+
// }
58+
// }
5959

6060
// 设置日志级别
6161
func WithLogLevel(level slog.Level) EvOption {

0 commit comments

Comments
 (0)