Skip to content

Commit ea2ebde

Browse files
committed
更新
1 parent 080d812 commit ea2ebde

File tree

3 files changed

+41
-39
lines changed

3 files changed

+41
-39
lines changed

api_epoll.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
178178
ev := &e.events[i]
179179
conn := e.parent.getConn(int(ev.Fd))
180180
if conn == nil {
181+
e.getMultiEventLoop().Logger.Debug("ev.Fd is", "fd", ev.Fd)
181182
unix.Close(int(ev.Fd))
182183
continue
183184
}
@@ -226,6 +227,7 @@ func (e *epollState) process(conn *Conn, isRead, isWrite bool) bool {
226227
if isRead {
227228
err := conn.processWebsocketFrame()
228229
if err != nil {
230+
e.getMultiEventLoop().Logger.Info("processWebsocketFrame", "err", err.Error())
229231
conn.closeWithLock(err)
230232
return true
231233
}

conn_unix.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func duplicateSocket(socketFD int) (int, error) {
121121
}
122122

123123
// 没有加锁的版本,有外层已经有锁保护,所以不需要加锁
124-
func (c *Conn) closeInnerWithOnClose(err error, onClose bool) {
124+
func (c *Conn) closeWithoutLockOnClose(err error, onClose bool) {
125125

126126
if c.isClosed() {
127127
return
@@ -148,9 +148,9 @@ func (c *Conn) closeInnerWithOnClose(err error, onClose bool) {
148148

149149
}
150150

151-
func (c *Conn) closeInner(err error) {
151+
func (c *Conn) closeWithoutLock(err error) {
152152

153-
c.closeInnerWithOnClose(err, true)
153+
c.closeWithoutLockOnClose(err, true)
154154
}
155155

156156
func (c *Conn) closeWithLock(err error) {
@@ -167,7 +167,7 @@ func (c *Conn) closeWithLock(err error) {
167167
if err == nil {
168168
err = io.EOF
169169
}
170-
c.closeInnerWithOnClose(err, false)
170+
c.closeWithoutLockOnClose(err, false)
171171

172172
c.mu.Unlock()
173173

@@ -262,7 +262,7 @@ func (c *Conn) maybeWriteAll(b []byte) (total int, ws writeState, err error) {
262262
}
263263

264264
c.getLogger().Error("writeOrAddPoll", "err", err.Error(), slog.Int64("fd", c.fd), slog.Int("b.len", len(b)))
265-
c.closeInner(err)
265+
c.closeWithoutLock(err)
266266
return total, writeDefault, err
267267
}
268268

@@ -354,9 +354,10 @@ func (c *Conn) processWebsocketFrame() (err error) {
354354
}
355355

356356
if c.readTimeout > 0 {
357-
if err = c.setReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
358-
return err
359-
}
357+
// if err = c.setReadDeadline(time.Time{}); err != nil {
358+
// return err
359+
// }
360+
c.setReadDeadline(time.Now().Add(c.readTimeout))
360361
}
361362
n := 0
362363
var success bool
@@ -492,9 +493,14 @@ func (c *Conn) setDeadlineInner(t **time.Timer, tm time.Time, err error) error {
492493
return nil
493494
}
494495
c.mu.Lock()
495-
defer c.mu.Unlock()
496+
// c.getLogger().Error("Conn-lock", "addr", uintptr(unsafe.Pointer(c)))
497+
defer func() {
498+
// c.getLogger().Error("Conn-unlock", "addr", uintptr(unsafe.Pointer(c)))
499+
c.mu.Unlock()
500+
}()
496501
if tm.IsZero() {
497502
if *t != nil {
503+
// c.getLogger().Error("conn-reset", "addr", uintptr(unsafe.Pointer(c)))
498504
(*t).Stop()
499505
*t = nil
500506
}

task/stream2/stream2_executor.go

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,33 @@ type stream2Executor struct {
2626
parent *stream2
2727
}
2828

29-
func (s *stream2Executor) AddTask(mu *sync.Mutex, f func() bool) error {
30-
29+
func myLock(mu *sync.Mutex) {
3130
if mu != nil {
3231
mu.Lock()
3332
}
33+
}
34+
35+
func myUnlock(mu *sync.Mutex) {
36+
if mu != nil {
37+
mu.Unlock()
38+
}
39+
}
40+
41+
func (s *stream2Executor) AddTask(mu *sync.Mutex, f func() bool) error {
3442

43+
myLock(mu)
3544
if atomic.LoadInt32(&s.parent.closed) == 1 {
45+
myUnlock(mu)
3646
return nil
3747
}
38-
3948
process := len(s.list) == 0
4049
s.list = append(s.list, f)
41-
if mu != nil {
42-
mu.Unlock()
43-
}
50+
myUnlock(mu)
4451

4552
if process {
46-
mu.Lock()
53+
myLock(mu)
4754
listSize := len(s.list)
48-
mu.Unlock()
55+
myUnlock(mu)
4956
s.parent.fn <- func() bool {
5057
s.run(mu)
5158
return false
@@ -67,39 +74,28 @@ func (s *stream2Executor) AddTask(mu *sync.Mutex, f func() bool) error {
6774
func (s *stream2Executor) run(mu *sync.Mutex) bool {
6875
var f func() bool
6976
for i := 0; ; i++ {
70-
if mu != nil {
71-
// 加锁
72-
mu.Lock()
73-
}
77+
myLock(mu)
7478

7579
if len(s.list) == 0 {
76-
if mu != nil {
77-
mu.Unlock()
78-
}
80+
myUnlock(mu)
7981
return false
8082
}
8183

8284
if len(s.list) == i {
8385
s.list = s.list[0:0]
84-
if mu != nil {
85-
mu.Unlock()
86-
}
86+
myUnlock(mu)
8787
return false
8888
}
8989

9090
if i >= len(s.list) {
9191
s.list = s.list[0:0]
92-
if mu != nil {
93-
mu.Unlock()
94-
}
92+
myUnlock(mu)
9593
return false
9694
}
9795

9896
f = s.list[i]
9997
s.list[i] = nil
100-
if mu != nil {
101-
mu.Unlock()
102-
}
98+
myUnlock(mu)
10399

104100
func() {
105101
defer func() {
@@ -117,19 +113,17 @@ func (s *stream2Executor) run(mu *sync.Mutex) bool {
117113
}
118114

119115
func (s *stream2Executor) Close(mu *sync.Mutex) error {
120-
if mu != nil {
121-
mu.Lock()
122-
}
116+
myLock(mu)
123117

124118
if atomic.LoadInt32(&s.parent.closed) == 1 {
119+
myUnlock(mu)
125120
return nil
126121
}
122+
127123
s.parent.subOnMessageCount(-len(s.list))
128124

129125
atomic.StoreInt32(&s.parent.closed, 1)
130126
s.list = nil
131-
if mu != nil {
132-
mu.Unlock()
133-
}
127+
myUnlock(mu)
134128
return nil
135129
}

0 commit comments

Comments
 (0)