@@ -32,9 +32,9 @@ def __init__(self, channel, consumer_tag, **kwargs):
32
32
33
33
async def _data (self , channel , msg , env , prop ):
34
34
if msg is None :
35
- await self ._q . put (None )
35
+ await self ._chan_send . send (None )
36
36
else :
37
- await self ._q . put ((msg , env , prop ))
37
+ await self ._chan_send . send ((msg , env , prop ))
38
38
39
39
if sys .version_info >= (3 ,5 ,3 ):
40
40
def __aiter__ (self ):
@@ -44,14 +44,15 @@ async def __aiter__(self):
44
44
return self
45
45
46
46
async def __anext__ (self ):
47
- res = await self ._q . get ()
47
+ res = await self ._chan_receive . receive ()
48
48
if res is None :
49
49
raise StopAsyncIteration
50
50
return res
51
51
52
52
async def __aenter__ (self ):
53
53
await self .channel .basic_consume (self ._data , consumer_tag = self .consumer_tag , ** self .kwargs )
54
- self ._q = trio .Queue (30 ) # TODO: 2 + possible prefetch
54
+ # TODO: 2 + possible prefetch
55
+ self ._chan_send , self ._chan_receive = trio .open_memory_channel (30 )
55
56
return self
56
57
57
58
async def __aexit__ (self , * tb ):
@@ -60,7 +61,8 @@ async def __aexit__(self, *tb):
60
61
await self .channel .basic_cancel (self .consumer_tag )
61
62
except AmqpClosedConnection :
62
63
pass
63
- del self ._q
64
+ del self ._chan_send
65
+ del self ._chan_receive
64
66
# these messages are not acknowledged, thus deleting the queue will
65
67
# not lose them
66
68
@@ -75,7 +77,6 @@ def __iter__(self):
75
77
76
78
77
79
class Channel :
78
- _q = None # for returned messages
79
80
80
81
def __init__ (self , protocol , channel_id ):
81
82
self .protocol = protocol
@@ -97,9 +98,13 @@ def __init__(self, protocol, channel_id):
97
98
self ._futures = {}
98
99
self ._ctag_events = {}
99
100
101
+ self ._chan_send = None
102
+ self ._chan_receive = None
103
+
100
104
def __aiter__ (self ):
101
- if self ._q is None :
102
- self ._q = trio .Queue (30 ) # TODO: 2 + possible prefetch
105
+ if self ._chan_send is None :
106
+ # TODO: 2 + possible prefetch
107
+ self ._chan_send , self ._chan_receive = trio .open_memory_channel (30 )
103
108
return self
104
109
105
110
if sys .version_info < (3 ,5 ,3 ):
@@ -108,7 +113,7 @@ async def __aiter__(self):
108
113
return self ._aiter ()
109
114
110
115
async def __anext__ (self ):
111
- res = await self ._q . get ()
116
+ res = await self ._chan_receive . receive ()
112
117
if res is None :
113
118
raise StopAsyncIteration
114
119
return res
@@ -149,8 +154,8 @@ def connection_closed(self, server_code=None, server_reason=None, exception=None
149
154
150
155
self .protocol .release_channel_id (self .channel_id )
151
156
self .close_event .set ()
152
- if self ._q is not None :
153
- self ._q . put_nowait (None )
157
+ if self ._chan_send is not None :
158
+ self ._chan_send . send_nowait (None )
154
159
155
160
async def dispatch_frame (self , frame ):
156
161
methods = {
@@ -271,8 +276,8 @@ async def close(self, reply_code=0, reply_text="Normal Shutdown"):
271
276
if not self .is_open :
272
277
raise exceptions .ChannelClosed ("channel already closed or closing" )
273
278
self .close_event .set ()
274
- if self ._q is not None :
275
- self ._q . put_nowait (None )
279
+ if self ._chan_send is not None :
280
+ self ._chan_send . send_nowait (None )
276
281
frame = amqp_frame .AmqpRequest (amqp_constants .TYPE_METHOD , self .channel_id )
277
282
frame .declare_method (amqp_constants .CLASS_CHANNEL , amqp_constants .CHANNEL_CLOSE )
278
283
request = amqp_frame .AmqpEncoder ()
@@ -946,11 +951,11 @@ async def basic_return(self, frame):
946
951
envelope = ReturnEnvelope (reply_code , reply_text ,
947
952
exchange_name , routing_key )
948
953
properties = content_header_frame .properties
949
- if self ._q is None :
954
+ if self ._chan_send is None :
950
955
# they have set mandatory bit, but havent added a callback
951
956
logger .warning ("You don't iterate the channel for returned messages!" )
952
957
else :
953
- await self ._q . put ((body , envelope , properties ))
958
+ await self ._chan_send . send ((body , envelope , properties ))
954
959
955
960
async def basic_get (self , queue_name = '' , no_ack = False ):
956
961
frame = amqp_frame .AmqpRequest (amqp_constants .TYPE_METHOD , self .channel_id )
0 commit comments