Skip to content

Commit a1ba01d

Browse files
committed
Rewrite interactive client (again) without threads.
Fix #1592.
1 parent bc3fd29 commit a1ba01d

File tree

1 file changed

+91
-47
lines changed

1 file changed

+91
-47
lines changed

src/websockets/__main__.py

+91-47
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
from __future__ import annotations
22

33
import argparse
4+
import asyncio
45
import os
5-
import signal
66
import sys
7-
import threading
8-
9-
10-
try:
11-
import readline # noqa: F401
12-
except ImportError: # readline isn't available on all platforms
13-
pass
7+
from typing import Generator
148

9+
from .asyncio.client import ClientConnection, connect
10+
from .asyncio.messages import SimpleQueue
11+
from .exceptions import ConnectionClosed
1512
from .frames import Close
16-
from .sync.client import ClientConnection, connect
13+
from .streams import StreamReader
1714
from .version import version as websockets_version
1815

1916

@@ -49,24 +46,94 @@ def print_over_input(string: str) -> None:
4946
sys.stdout.flush()
5047

5148

52-
def print_incoming_messages(websocket: ClientConnection, stop: threading.Event) -> None:
53-
for message in websocket:
49+
class ReadLines(asyncio.Protocol):
50+
def __init__(self) -> None:
51+
self.reader = StreamReader()
52+
self.messages: SimpleQueue[str] = SimpleQueue()
53+
54+
def parse(self) -> Generator[None, None, None]:
55+
while True:
56+
sys.stdout.write("> ")
57+
sys.stdout.flush()
58+
line = yield from self.reader.read_line(sys.maxsize)
59+
self.messages.put(line.decode().rstrip("\r\n"))
60+
61+
def connection_made(self, transport: asyncio.BaseTransport) -> None:
62+
self.parser = self.parse()
63+
next(self.parser)
64+
65+
def data_received(self, data: bytes) -> None:
66+
self.reader.feed_data(data)
67+
next(self.parser)
68+
69+
def eof_received(self) -> None:
70+
self.reader.feed_eof()
71+
# next(self.parser) isn't useful and would raise EOFError.
72+
73+
def connection_lost(self, exc: Exception | None) -> None:
74+
self.reader.discard()
75+
self.messages.abort()
76+
77+
78+
async def print_incoming_messages(websocket: ClientConnection) -> None:
79+
async for message in websocket:
5480
if isinstance(message, str):
5581
print_during_input("< " + message)
5682
else:
5783
print_during_input("< (binary) " + message.hex())
58-
if not stop.is_set():
59-
# When the server closes the connection, raise KeyboardInterrupt
60-
# in the main thread to exit the program.
61-
if sys.platform == "win32":
62-
ctrl_c = signal.CTRL_C_EVENT
63-
else:
64-
ctrl_c = signal.SIGINT
65-
os.kill(os.getpid(), ctrl_c)
84+
85+
86+
async def send_outgoing_messages(
87+
websocket: ClientConnection,
88+
messages: SimpleQueue[str],
89+
) -> None:
90+
while True:
91+
try:
92+
message = await messages.get()
93+
except EOFError:
94+
break
95+
try:
96+
await websocket.send(message)
97+
except ConnectionClosed:
98+
break
99+
100+
101+
async def interactive_client(uri: str) -> None:
102+
try:
103+
websocket = await connect(uri)
104+
except Exception as exc:
105+
print(f"Failed to connect to {uri}: {exc}.")
106+
sys.exit(1)
107+
else:
108+
print(f"Connected to {uri}.")
109+
110+
loop = asyncio.get_running_loop()
111+
transport, protocol = await loop.connect_read_pipe(ReadLines, sys.stdin)
112+
incoming = asyncio.create_task(
113+
print_incoming_messages(websocket),
114+
)
115+
outgoing = asyncio.create_task(
116+
send_outgoing_messages(websocket, protocol.messages),
117+
)
118+
try:
119+
await asyncio.wait(
120+
[incoming, outgoing],
121+
return_when=asyncio.FIRST_COMPLETED,
122+
)
123+
except (KeyboardInterrupt, EOFError): # ^C, ^D
124+
pass
125+
finally:
126+
incoming.cancel()
127+
outgoing.cancel()
128+
transport.close()
129+
130+
await websocket.close()
131+
assert websocket.close_code is not None and websocket.close_reason is not None
132+
close_status = Close(websocket.close_code, websocket.close_reason)
133+
print_over_input(f"Connection closed: {close_status}.")
66134

67135

68136
def main() -> None:
69-
# Parse command line arguments.
70137
parser = argparse.ArgumentParser(
71138
prog="python -m websockets",
72139
description="Interactive WebSocket client.",
@@ -90,34 +157,11 @@ def main() -> None:
90157
os.system("")
91158

92159
try:
93-
websocket = connect(args.uri)
94-
except Exception as exc:
95-
print(f"Failed to connect to {args.uri}: {exc}.")
96-
sys.exit(1)
97-
else:
98-
print(f"Connected to {args.uri}.")
99-
100-
stop = threading.Event()
101-
102-
# Start the thread that reads messages from the connection.
103-
thread = threading.Thread(target=print_incoming_messages, args=(websocket, stop))
104-
thread.start()
105-
106-
# Read from stdin in the main thread in order to receive signals.
107-
try:
108-
while True:
109-
# Since there's no size limit, put_nowait is identical to put.
110-
message = input("> ")
111-
websocket.send(message)
112-
except (KeyboardInterrupt, EOFError): # ^C, ^D
113-
stop.set()
114-
websocket.close()
115-
116-
assert websocket.close_code is not None and websocket.close_reason is not None
117-
close_status = Close(websocket.close_code, websocket.close_reason)
118-
print_over_input(f"Connection closed: {close_status}.")
160+
import readline # noqa: F401
161+
except ImportError: # readline isn't available on all platforms
162+
pass
119163

120-
thread.join()
164+
asyncio.run(interactive_client(args.uri))
121165

122166

123167
if __name__ == "__main__":

0 commit comments

Comments
 (0)