Skip to content

Commit c873b50

Browse files
committed
更新
1 parent ea2ebde commit c873b50

File tree

3 files changed

+5
-13
lines changed

3 files changed

+5
-13
lines changed

api_epoll.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
178178
ev := &e.events[i]
179179
conn := e.parent.getConn(int(ev.Fd))
180180
if conn == nil {
181-
e.getMultiEventLoop().Logger.Debug("ev.Fd is", "fd", ev.Fd)
181+
e.getMultiEventLoop().Logger.Debug("ev.Fd get conn is nil", "fd", ev.Fd)
182182
unix.Close(int(ev.Fd))
183183
continue
184184
}
@@ -225,6 +225,8 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
225225

226226
func (e *epollState) process(conn *Conn, isRead, isWrite bool) bool {
227227
if isRead {
228+
229+
e.getMultiEventLoop().addReadEvNum()
228230
err := conn.processWebsocketFrame()
229231
if err != nil {
230232
e.getMultiEventLoop().Logger.Info("processWebsocketFrame", "err", err.Error())
@@ -234,6 +236,7 @@ func (e *epollState) process(conn *Conn, isRead, isWrite bool) bool {
234236
}
235237

236238
if isWrite {
239+
e.getMultiEventLoop().addWriteEvNum()
237240
// 刷新下直接写入失败的数据
238241
conn.flushOrClose()
239242
}

task/stream2/stream2.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ type stream2 struct {
4444
conf *driver.Conf // 初始化传递过来的参数
4545
process func() // 处理单个任务的循环
4646
onMessageCount int64 //需要处理的OnMessage个数
47-
closed int32
4847
}
4948

5049
func (s *stream2) addOnMessageCount(n int) {

task/stream2/stream2_executor.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package stream2
1717
import (
1818
"runtime"
1919
"sync"
20-
"sync/atomic"
2120
"unsafe"
2221
)
2322

@@ -41,10 +40,7 @@ func myUnlock(mu *sync.Mutex) {
4140
func (s *stream2Executor) AddTask(mu *sync.Mutex, f func() bool) error {
4241

4342
myLock(mu)
44-
if atomic.LoadInt32(&s.parent.closed) == 1 {
45-
myUnlock(mu)
46-
return nil
47-
}
43+
4844
process := len(s.list) == 0
4945
s.list = append(s.list, f)
5046
myUnlock(mu)
@@ -115,14 +111,8 @@ func (s *stream2Executor) run(mu *sync.Mutex) bool {
115111
func (s *stream2Executor) Close(mu *sync.Mutex) error {
116112
myLock(mu)
117113

118-
if atomic.LoadInt32(&s.parent.closed) == 1 {
119-
myUnlock(mu)
120-
return nil
121-
}
122-
123114
s.parent.subOnMessageCount(-len(s.list))
124115

125-
atomic.StoreInt32(&s.parent.closed, 1)
126116
s.list = nil
127117
myUnlock(mu)
128118
return nil

0 commit comments

Comments
 (0)