8
8
import urllib .parse
9
9
from collections .abc import AsyncIterator , Generator , Sequence
10
10
from types import TracebackType
11
- from typing import Any , Callable , Literal
11
+ from typing import Any , Callable , Literal , cast
12
12
13
13
from ..client import ClientProtocol , backoff
14
- from ..datastructures import HeadersLike
15
- from ..exceptions import InvalidMessage , InvalidStatus , ProxyError , SecurityError
14
+ from ..datastructures import Headers , HeadersLike
15
+ from ..exceptions import (
16
+ InvalidMessage ,
17
+ InvalidProxyMessage ,
18
+ InvalidProxyStatus ,
19
+ InvalidStatus ,
20
+ ProxyError ,
21
+ SecurityError ,
22
+ )
16
23
from ..extensions .base import ClientExtensionFactory
17
24
from ..extensions .permessage_deflate import enable_client_permessage_deflate
18
- from ..headers import validate_subprotocols
25
+ from ..headers import build_authorization_basic , build_host , validate_subprotocols
19
26
from ..http11 import USER_AGENT , Response
20
27
from ..protocol import CONNECTING , Event
28
+ from ..streams import StreamReader
21
29
from ..typing import LoggerLike , Origin , Subprotocol
22
30
from ..uri import Proxy , WebSocketURI , get_proxy , parse_proxy , parse_uri
23
31
from .compatibility import TimeoutError , asyncio_timeout
@@ -257,7 +265,7 @@ class connect:
257
265
the TLS handshake.
258
266
259
267
* You can set ``host`` and ``port`` to connect to a different host and port
260
- from those found in ``uri``. This only changes the destination of the TCP
268
+ from those found in ``uri``. This only changes the ws_uri of the TCP
261
269
connection. The host name from ``uri`` is still used in the TLS handshake
262
270
for secure connections and in the ``Host`` header.
263
271
@@ -266,6 +274,23 @@ class connect:
266
274
:meth:`~asyncio.loop.create_connection` method) to create a suitable
267
275
client socket and customize it.
268
276
277
+ When using a proxy, :meth:`~asyncio.loop.create_connection` is called twice:
278
+ first to connect to the proxy, then to connect to the WebSocket server via
279
+ the proxy. In that case:
280
+
281
+ * Prefix keyword arguments with ``proxy_`` for configuring TLS between the
282
+ client and an HTTPS proxy: ``proxy_ssl``, ``proxy_server_hostname``,
283
+ ``proxy_ssl_keylog_callback``, and ``proxy_ssl_version``.
284
+ * Use the standard keyword arguments for configuring TLS between the proxy
285
+ and the WebSocket server.
286
+ * Other keyword arguments are used only for connecting to the proxy. The
287
+ socket connected to the proxy is then passed in the ``sock`` argument when
288
+ connecting to the WebSocket server.
289
+
290
+ To pass different arguments to the two calls, prefix settings for connecting
291
+ to the proxy with ``proxy_``. For example, you can set ``proxy_ssl`` or
292
+ ``proxy_server_hostname`` to configure .
293
+
269
294
Raises:
270
295
InvalidURI: If ``uri`` isn't a valid WebSocket URI.
271
296
InvalidProxy: If ``proxy`` isn't a valid proxy.
@@ -383,12 +408,19 @@ def factory() -> ClientConnection:
383
408
if kwargs .pop ("unix" , False ):
384
409
_ , connection = await loop .create_unix_connection (factory , ** kwargs )
385
410
elif proxy is not None :
386
- kwargs ["sock" ] = await connect_proxy (
387
- parse_proxy (proxy ),
388
- ws_uri ,
389
- local_addr = kwargs .pop ("local_addr" , None ),
390
- )
391
- _ , connection = await loop .create_connection (factory , ** kwargs )
411
+ # Split keyword arguments for connecting to the proxy or the server.
412
+ all_kwargs , proxy_kwargs , kwargs = kwargs , {}, {}
413
+ for key , value in all_kwargs .items ():
414
+ if key .startswith ("ssl" ) or key == "server_hostname" :
415
+ kwargs [key ] = value
416
+ elif key .startswith ("proxy_" ):
417
+ proxy_kwargs [key [6 :]] = value
418
+ else :
419
+ proxy_kwargs [key ] = value
420
+ # Connect to the proxy.
421
+ sock = await connect_proxy (parse_proxy (proxy ), ws_uri , ** proxy_kwargs )
422
+ # Connect to the server via the proxy.
423
+ _ , connection = await loop .create_connection (factory , sock = sock , ** kwargs )
392
424
else :
393
425
if kwargs .get ("sock" ) is None :
394
426
kwargs .setdefault ("host" , ws_uri .host )
@@ -645,6 +677,98 @@ async def connect_socks_proxy(
645
677
raise ImportError ("python-socks is required to use a SOCKS proxy" )
646
678
647
679
680
+ def prepare_connect_request (proxy : Proxy , ws_uri : WebSocketURI ) -> bytes :
681
+ host = build_host (ws_uri .host , ws_uri .port , ws_uri .secure , always_include_port = True )
682
+ headers = Headers ()
683
+ headers ["Host" ] = build_host (ws_uri .host , ws_uri .port , ws_uri .secure )
684
+ if proxy .username is not None :
685
+ assert proxy .password is not None # enforced by parse_proxy()
686
+ headers ["Proxy-Authorization" ] = build_authorization_basic (
687
+ proxy .username , proxy .password
688
+ )
689
+ # We cannot use the Request class because it supports only GET requests.
690
+ return f"CONNECT { host } HTTP/1.1\r \n " .encode () + headers .serialize ()
691
+
692
+
693
+ class HTTPProxyConnection (asyncio .Protocol ):
694
+ def __init__ (self , ws_uri : WebSocketURI , proxy : Proxy ):
695
+ self .ws_uri = ws_uri
696
+ self .proxy = proxy
697
+
698
+ self .reader = StreamReader ()
699
+ self .parser = Response .parse (
700
+ self .reader .read_line ,
701
+ self .reader .read_exact ,
702
+ self .reader .read_to_eof ,
703
+ include_body = False ,
704
+ )
705
+
706
+ loop = asyncio .get_running_loop ()
707
+ self .response : asyncio .Future [Response ] = loop .create_future ()
708
+
709
+ def run_parser (self ) -> None :
710
+ try :
711
+ next (self .parser )
712
+ except StopIteration as exc :
713
+ response = exc .value
714
+ if 200 <= response .status_code < 300 :
715
+ self .response .set_result (response )
716
+ else :
717
+ self .response .set_exception (InvalidProxyStatus (response ))
718
+ except Exception as exc :
719
+ proxy_exc = InvalidProxyMessage (
720
+ "did not receive a valid HTTP response from proxy"
721
+ )
722
+ proxy_exc .__cause__ = exc
723
+ self .response .set_exception (proxy_exc )
724
+
725
+ def connection_made (self , transport : asyncio .BaseTransport ) -> None :
726
+ transport = cast (asyncio .Transport , transport )
727
+ self .transport = transport
728
+ self .transport .write (prepare_connect_request (self .proxy , self .ws_uri ))
729
+
730
+ def data_received (self , data : bytes ) -> None :
731
+ self .reader .feed_data (data )
732
+ self .run_parser ()
733
+
734
+ def eof_received (self ) -> None :
735
+ self .reader .feed_eof ()
736
+ self .run_parser ()
737
+
738
+ def connection_lost (self , exc : Exception | None ) -> None :
739
+ self .reader .feed_eof ()
740
+ if exc is not None :
741
+ self .response .set_exception (exc )
742
+
743
+
744
+ async def connect_http_proxy (
745
+ proxy : Proxy ,
746
+ ws_uri : WebSocketURI ,
747
+ ** kwargs : Any ,
748
+ ) -> socket .socket :
749
+ if proxy .scheme != "https" and kwargs .get ("ssl" ) is not None :
750
+ raise ValueError ("proxy_ssl argument is incompatible with an http:// proxy" )
751
+
752
+ transport , protocol = await asyncio .get_running_loop ().create_connection (
753
+ lambda : HTTPProxyConnection (ws_uri , proxy ),
754
+ proxy .host ,
755
+ proxy .port ,
756
+ ** kwargs ,
757
+ )
758
+
759
+ try :
760
+ # This raises exceptions if the connection to the proxy fails.
761
+ await protocol .response
762
+
763
+ # We need to extract the socket from the transport, or else asyncio raises
764
+ # RuntimeError: File descriptor ... is used by transport ...
765
+ # To achieve this, we duplicate the socket then close the transport.
766
+ sock = transport .get_extra_info ("socket" )
767
+ return socket .fromfd (sock .fileno (), sock .family , sock .type , sock .proto )
768
+ finally :
769
+ transport .close ()
770
+
771
+
648
772
async def connect_proxy (
649
773
proxy : Proxy ,
650
774
ws_uri : WebSocketURI ,
@@ -654,5 +778,7 @@ async def connect_proxy(
654
778
# parse_proxy() validates proxy.scheme.
655
779
if proxy .scheme [:5 ] == "socks" :
656
780
return await connect_socks_proxy (proxy , ws_uri , ** kwargs )
781
+ elif proxy .scheme [:4 ] == "http" :
782
+ return await connect_http_proxy (proxy , ws_uri , ** kwargs )
657
783
else :
658
784
raise AssertionError ("unsupported proxy" )
0 commit comments