Skip to content

Commit c1c01c6

Browse files
fix: Fix recv blocking when sync-websocket is closed (#239)
1 parent 94ee19d commit c1c01c6

File tree

3 files changed

+27
-19
lines changed

3 files changed

+27
-19
lines changed

cozepy/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44
from functools import lru_cache
55

6-
VERSION = "0.15.0"
6+
VERSION = "0.16.0"
77

88

99
def get_os_version() -> str:

cozepy/websockets/ws.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def __init__(
188188
self._receive_thread: Optional[threading.Thread] = None
189189
self._completed_events: Set[WebsocketsEventType] = set()
190190
self._completed_event = threading.Event()
191+
self._join_event = threading.Event()
191192

192193
@contextmanager
193194
def __call__(self):
@@ -236,37 +237,44 @@ def close(self) -> None:
236237
if self._state not in (self.State.CONNECTED, self.State.CONNECTING):
237238
return
238239
self._state = self.State.CLOSING
240+
self._join_event.set()
239241
self._close()
240242
self._state = self.State.CLOSED
241243

242244
def _send_loop(self) -> None:
243245
try:
244-
while True:
245-
event = self._input_queue.get()
246-
self._send_event(event)
247-
self._input_queue.task_done()
246+
while not self._join_event.is_set():
247+
try:
248+
event = self._input_queue.get(timeout=0.5)
249+
self._send_event(event)
250+
self._input_queue.task_done()
251+
except queue.Empty:
252+
pass
248253
except Exception as e:
249254
self._handle_error(e)
250255

251256
def _receive_loop(self) -> None:
252257
try:
253-
while True:
258+
while not self._join_event.is_set():
254259
if not self._ws:
255260
log_debug("[%s] empty websocket conn, close", self._path)
256261
break
257262

258-
data = self._ws.recv()
259-
message = json.loads(data)
260-
event_type = message.get("event_type")
261-
log_debug("[%s] receive event, type=%s, event=%s", self._path, event_type, data)
262-
263-
event = self._load_all_event(message)
264-
if event:
265-
handler = self._on_event.get(event_type)
266-
if handler:
267-
handler(self, event)
268-
self._completed_events.add(event_type)
269-
self._completed_event.set()
263+
try:
264+
data = self._ws.recv(timeout=0.5)
265+
message = json.loads(data)
266+
event_type = message.get("event_type")
267+
log_debug("[%s] receive event, type=%s, event=%s", self._path, event_type, data)
268+
269+
event = self._load_all_event(message)
270+
if event:
271+
handler = self._on_event.get(event_type)
272+
if handler:
273+
handler(self, event)
274+
self._completed_events.add(event_type)
275+
self._completed_event.set()
276+
except TimeoutError:
277+
pass
270278
except Exception as e:
271279
self._handle_error(e)
272280

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "cozepy"
3-
version = "0.15.0"
3+
version = "0.16.0"
44
description = "OpenAPI SDK for Coze(coze.com/coze.cn)"
55
authors = ["chyroc <chyroc@bytedance.com>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)