Skip to content

Commit 5a79391

Browse files
authored
Merge pull request #30 from antlabs/v0.2.2
V0.2.2
2 parents 9b115ae + c873b50 commit 5a79391

18 files changed

+320
-589
lines changed

api_epoll.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
178178
ev := &e.events[i]
179179
conn := e.parent.getConn(int(ev.Fd))
180180
if conn == nil {
181+
e.getMultiEventLoop().Logger.Debug("ev.Fd get conn is nil", "fd", ev.Fd)
181182
unix.Close(int(ev.Fd))
182183
continue
183184
}
@@ -224,14 +225,18 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
224225

225226
func (e *epollState) process(conn *Conn, isRead, isWrite bool) bool {
226227
if isRead {
228+
229+
e.getMultiEventLoop().addReadEvNum()
227230
err := conn.processWebsocketFrame()
228231
if err != nil {
232+
e.getMultiEventLoop().Logger.Info("processWebsocketFrame", "err", err.Error())
229233
conn.closeWithLock(err)
230234
return true
231235
}
232236
}
233237

234238
if isWrite {
239+
e.getMultiEventLoop().addWriteEvNum()
235240
// 刷新下直接写入失败的数据
236241
conn.flushOrClose()
237242
}

client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func ClientOptionToConf(opts ...ClientOption) *DialOption {
4949
for _, o := range opts {
5050
o(&dial)
5151
}
52+
dial.defaultSettingAfter()
5253
return &dial
5354
}
5455

@@ -87,6 +88,7 @@ func Dial(rawUrl string, opts ...ClientOption) (*Conn, error) {
8788
o(&dial)
8889
}
8990

91+
dial.defaultSettingAfter()
9092
return dial.Dial()
9193
}
9294

@@ -248,7 +250,8 @@ func (d *DialOption) Dial() (wsCon *Conn, err error) {
248250
return nil, err
249251
}
250252
wsCon.pd = pd
251-
d.Callback.OnOpen(wsCon)
253+
wsCon.Callback = d.cb
254+
wsCon.OnOpen(wsCon)
252255
if br.Buffered() > 0 {
253256
b, err := br.Peek(br.Buffered())
254257
if err != nil {

common_options.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// 0. CallbackFunc
2222
func WithClientCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ClientOption {
2323
return func(o *DialOption) {
24-
o.Callback = &funcToCallback{
24+
o.cb = &funcToCallback{
2525
onOpen: open,
2626
onMessage: m,
2727
onClose: c,
@@ -32,7 +32,7 @@ func WithClientCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) Cli
3232
// 配置服务端回调函数
3333
func WithServerCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ServerOption {
3434
return func(o *ConnOption) {
35-
o.Callback = &funcToCallback{
35+
o.cb = &funcToCallback{
3636
onOpen: open,
3737
onMessage: m,
3838
onClose: c,
@@ -44,14 +44,14 @@ func WithServerCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) Ser
4444
// 配置客户端callback
4545
func WithClientCallback(cb Callback) ClientOption {
4646
return func(o *DialOption) {
47-
o.Callback = cb
47+
o.cb = cb
4848
}
4949
}
5050

5151
// 配置服务端回调函数
5252
func WithServerCallback(cb Callback) ServerOption {
5353
return func(o *ConnOption) {
54-
o.Callback = cb
54+
o.cb = cb
5555
}
5656
}
5757

@@ -88,14 +88,14 @@ func WithClientEnableUTF8Check() ClientOption {
8888
// 仅仅配置OnMessae函数
8989
func WithServerOnMessageFunc(cb OnMessageFunc) ServerOption {
9090
return func(o *ConnOption) {
91-
o.Callback = OnMessageFunc(cb)
91+
o.cb = OnMessageFunc(cb)
9292
}
9393
}
9494

9595
// 仅仅配置OnMessae函数
9696
func WithClientOnMessageFunc(cb OnMessageFunc) ClientOption {
9797
return func(o *DialOption) {
98-
o.Callback = OnMessageFunc(cb)
98+
o.cb = OnMessageFunc(cb)
9999
}
100100
}
101101

@@ -239,14 +239,14 @@ func WithClientReadTimeout(t time.Duration) ClientOption {
239239
// 17.1 配置服务端OnClose
240240
func WithServerOnCloseFunc(onClose func(c *Conn, err error)) ServerOption {
241241
return func(o *ConnOption) {
242-
o.Callback = OnCloseFunc(onClose)
242+
o.cb = OnCloseFunc(onClose)
243243
}
244244
}
245245

246246
// 17.2 配置客户端OnClose
247247
func WithClientOnCloseFunc(onClose func(c *Conn, err error)) ClientOption {
248248
return func(o *DialOption) {
249-
o.Callback = OnCloseFunc(onClose)
249+
o.cb = OnCloseFunc(onClose)
250250
}
251251
}
252252

config.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ import (
2020
"github.com/antlabs/wsutil/deflate"
2121
)
2222

23+
// Config 配置
24+
// 有两种方式可以配置相关值
25+
// 1. NewUpgrade, 这通常在只初始化一次的时候使用
26+
// 2. greatws.Upgrade(), 这通常在每次请求的时候使用,每个语法的配置参数不一样
27+
// 这样可以方便的在两种方式中使用, 不需要担心配置参数会有并发修改的情况
2328
type Config struct {
24-
Callback
29+
cb Callback // 静态配置
2530
deflate.PermessageDeflateConf // 静态配置, 从WithXXX函数中获取
26-
tcpNoDelay bool //TODO: 加下这个功能
31+
tcpNoDelay bool // TODO: 加下这个功能
2732
replyPing bool // 开启自动回复
2833
ignorePong bool // 忽略pong消息
2934
disableBufioClearHack bool // 关闭bufio的clear hack优化
@@ -45,7 +50,7 @@ type Config struct {
4550

4651
// 默认设置
4752
func (c *Config) defaultSetting() {
48-
c.Callback = &DefCallback{}
53+
c.cb = &DefCallback{}
4954
c.maxDelayWriteNum = 10
5055
c.windowsMultipleTimesPayloadSize = 1.0
5156
c.delayWriteInitBufferSize = 8 * 1024
@@ -56,3 +61,11 @@ func (c *Config) defaultSetting() {
5661
c.utf8Check = func(b []byte) bool { return true }
5762
c.runInGoTask = "stream2" //默认使用stream2模块
5863
}
64+
65+
func (c *Config) defaultSettingAfter() {
66+
67+
if c.multiEventLoop == nil {
68+
c.multiEventLoop = getDefaultMultiEventLoop()
69+
}
70+
c.multiEventLoop.Start()
71+
}

conn_core.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,5 +664,9 @@ func (c *Conn) WritePong(data []byte) (err error) {
664664
}
665665

666666
func (c *Conn) Close() {
667+
if c == nil {
668+
return
669+
}
670+
667671
c.closeWithLock(nil)
668672
}

conn_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ func Test_Conn(t *testing.T) {
3030
// 在未加入tls功能时,Conn的大小为160字节够用了。
3131
fmt.Printf("%d\n", unsafe.Sizeof(Conn{}))
3232
// conn大小改变历史
33-
// 新增上下文接管,从< 160到184
34-
if unsafe.Sizeof(Conn{}) > 184 {
33+
// 新增上下文接管,从 小于160到184
34+
// 把Callback移到Conn, 从184到200
35+
fmt.Printf("conn.size = %d\n", unsafe.Sizeof(conn{}))
36+
if unsafe.Sizeof(Conn{}) > 200 {
3537
t.Errorf("Conn size:%d is too large", unsafe.Sizeof(Conn{}))
3638
}
3739
})

conn_unix.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,16 @@ var (
6666
type Conn struct {
6767
conn
6868

69-
pd deflate.PermessageDeflateConf // 上下文接管的控制参数, 由于每个comm的配置都可能不一样,所以需要放在Conn里面
70-
mu sync.Mutex // 锁
71-
*Config // 配置
72-
deCtx *deflate.DeCompressContextTakeover // 解压缩上下文
73-
enCtx *deflate.CompressContextTakeover // 压缩上下文
74-
parent *EventLoop // event loop
75-
task driver.TaskExecutor // 任务,该任务会进协程池里面执行
76-
rtime *time.Timer // 控制读超时
77-
wtime *time.Timer // 控制写超时
69+
Callback // callback移至conn中
70+
pd deflate.PermessageDeflateConf // 上下文接管的控制参数, 由于每个comm的配置都可能不一样,所以需要放在Conn里面
71+
mu sync.Mutex // 锁
72+
*Config // 配置
73+
deCtx *deflate.DeCompressContextTakeover // 解压缩上下文
74+
enCtx *deflate.CompressContextTakeover // 压缩上下文
75+
parent *EventLoop // event loop
76+
task driver.TaskExecutor // 任务,该任务会进协程池里面执行
77+
rtime *time.Timer // 控制读超时
78+
wtime *time.Timer // 控制写超时
7879

7980
// mu2由 onCloseOnce使用, 这里使用新锁只是为了简化维护的难度
8081
// 也可以共用mu,区别 优点:节约内存,缺点:容易出现死锁和需要精心调试代码
@@ -120,45 +121,43 @@ func duplicateSocket(socketFD int) (int, error) {
120121
}
121122

122123
// 没有加锁的版本,有外层已经有锁保护,所以不需要加锁
123-
func (c *Conn) closeInnerWithOnClose(err error, onClose bool) {
124+
func (c *Conn) closeWithoutLockOnClose(err error, onClose bool) {
124125

125126
if c.isClosed() {
126127
return
127128
}
128129

129-
if !c.isClosed() {
130-
131-
if err != nil {
132-
err = io.EOF
133-
}
134-
fd := c.getFd()
135-
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
136-
c.parent.del(c)
137-
atomic.StoreInt64(&c.fd, -1)
138-
atomic.StoreInt32(&c.closed, 1)
130+
if err != nil {
131+
err = io.EOF
139132
}
133+
fd := c.getFd()
134+
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
135+
c.parent.del(c)
136+
atomic.StoreInt64(&c.fd, -1)
137+
atomic.StoreInt32(&c.closed, 1)
140138

141139
// 这个必须要放在后面
142140
if onClose {
143141
c.onCloseOnce.Do(&c.mu2, func() {
144142
c.OnClose(c, err)
145143
})
144+
if c.task != nil {
145+
c.task.Close(nil)
146+
}
146147
}
147148

148149
}
149150

150-
func (c *Conn) closeInner(err error) {
151+
func (c *Conn) closeWithoutLock(err error) {
151152

152-
c.closeInnerWithOnClose(err, true)
153+
c.closeWithoutLockOnClose(err, true)
153154
}
154155

155156
func (c *Conn) closeWithLock(err error) {
156157
if c.isClosed() {
157158
return
158159
}
159160

160-
c.task.Close(&c.mu)
161-
162161
c.mu.Lock()
163162
if c.isClosed() {
164163
c.mu.Unlock()
@@ -168,7 +167,7 @@ func (c *Conn) closeWithLock(err error) {
168167
if err == nil {
169168
err = io.EOF
170169
}
171-
c.closeInnerWithOnClose(err, false)
170+
c.closeWithoutLockOnClose(err, false)
172171

173172
c.mu.Unlock()
174173

@@ -263,7 +262,7 @@ func (c *Conn) maybeWriteAll(b []byte) (total int, ws writeState, err error) {
263262
}
264263

265264
c.getLogger().Error("writeOrAddPoll", "err", err.Error(), slog.Int64("fd", c.fd), slog.Int("b.len", len(b)))
266-
c.closeInner(err)
265+
c.closeWithoutLock(err)
267266
return total, writeDefault, err
268267
}
269268

@@ -355,9 +354,10 @@ func (c *Conn) processWebsocketFrame() (err error) {
355354
}
356355

357356
if c.readTimeout > 0 {
358-
if err = c.setReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
359-
return err
360-
}
357+
// if err = c.setReadDeadline(time.Time{}); err != nil {
358+
// return err
359+
// }
360+
c.setReadDeadline(time.Now().Add(c.readTimeout))
361361
}
362362
n := 0
363363
var success bool
@@ -493,9 +493,14 @@ func (c *Conn) setDeadlineInner(t **time.Timer, tm time.Time, err error) error {
493493
return nil
494494
}
495495
c.mu.Lock()
496-
defer c.mu.Unlock()
496+
// c.getLogger().Error("Conn-lock", "addr", uintptr(unsafe.Pointer(c)))
497+
defer func() {
498+
// c.getLogger().Error("Conn-unlock", "addr", uintptr(unsafe.Pointer(c)))
499+
c.mu.Unlock()
500+
}()
497501
if tm.IsZero() {
498502
if *t != nil {
503+
// c.getLogger().Error("conn-reset", "addr", uintptr(unsafe.Pointer(c)))
499504
(*t).Stop()
500505
*t = nil
501506
}

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
module github.com/antlabs/greatws
22

3-
go 1.21
3+
go 1.21.1
44

55
require (
6+
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c
67
github.com/antlabs/wsutil v0.1.10
7-
golang.org/x/sys v0.12.0
8+
golang.org/x/sys v0.20.0
89
)
910

1011
require github.com/klauspost/compress v1.17.8 // indirect

go.sum

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
github.com/antlabs/wsutil v0.1.9 h1:om+dMBbJnfq9aNeCv7A3ip8+McH9UaTjPLEOovyokLI=
2-
github.com/antlabs/wsutil v0.1.9/go.mod h1:Pk7xYOw3o5iEB6ukiOu+2uJMLYeMVVjJLazFD3okI2A=
1+
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c h1:VHYsrGiynSH8kkauv8Gl+Xd9j883cyNJPUj6m7jEg44=
2+
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c/go.mod h1:xRJWwB0CjmTPmZ+I92DKy1N6N/7/IqMC9IPNLbE7Je4=
3+
github.com/antlabs/wsutil v0.1.10 h1:86p67dG8/iiQ+yZrHVl73OPHGnXfXopFSU0w84fLOdE=
4+
github.com/antlabs/wsutil v0.1.10/go.mod h1:Pk7xYOw3o5iEB6ukiOu+2uJMLYeMVVjJLazFD3okI2A=
35
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
46
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
5-
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
6-
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
7+
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
8+
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

0 commit comments

Comments
 (0)