@@ -713,9 +713,6 @@ async def _register_subscription(
713
713
# `_sids_by_group` without any locks.
714
714
self ._assert_thread ()
715
715
716
- # The subject we will trigger on the `broadcast` message.
717
- trigger = rx .subjects .Subject ()
718
-
719
716
# The subscription notification queue.
720
717
queue_size = notification_queue_limit
721
718
if not queue_size or queue_size < 0 :
@@ -728,56 +725,41 @@ async def _register_subscription(
728
725
729
726
# Start an endless task which listens the `notification_queue`
730
727
# and invokes subscription "resolver" on new notifications.
731
- async def notifier ():
728
+ async def notifier (observer : rx . Observer ):
732
729
"""Watch the notification queue and notify clients."""
733
730
734
731
# Assert we run in a proper thread.
735
732
self ._assert_thread ()
736
-
737
- # Dirty hack to partially workaround the race between:
738
- # 1) call to `result.subscribe` in `_on_gql_start`; and
739
- # 2) call to `trigger.on_next` below in this function.
740
- # The first call must be earlier. Otherwise, first one or more notifications
741
- # may be lost.
742
- await asyncio .sleep (1 )
743
-
744
733
while True :
745
734
serialized_payload = await notification_queue .get ()
746
735
747
736
# Run a subscription's `publish` method (invoked by the
748
- # `trigger .on_next` function) within the threadpool used
737
+ # `observer .on_next` function) within the threadpool used
749
738
# for processing other GraphQL resolver functions.
750
739
# NOTE: it is important to run the deserialization
751
740
# in the worker thread as well.
752
741
def workload ():
753
742
try :
754
743
payload = Serializer .deserialize (serialized_payload )
755
744
except Exception as ex : # pylint: disable=broad-except
756
- trigger .on_error (f"Cannot deserialize payload. { ex } " )
745
+ observer .on_error (f"Cannot deserialize payload. { ex } " )
757
746
else :
758
- trigger .on_next (payload )
747
+ observer .on_next (payload )
759
748
760
749
await self ._run_in_worker (workload )
761
750
762
751
# Message processed. This allows `Queue.join` to work.
763
752
notification_queue .task_done ()
764
753
765
- # Enqueue the `publish` method execution. But do not notify
766
- # clients when `publish` returns `SKIP`.
767
- stream = trigger .map (publish_callback ).filter ( # pylint: disable=no-member
768
- lambda publish_returned : publish_returned is not self .SKIP
769
- )
770
-
754
+ def push_payloads (observer : rx .Observer ):
771
755
# Start listening for broadcasts (subscribe to the Channels
772
756
# groups), spawn the notification processing task and put
773
757
# subscription information into the registry.
774
758
# NOTE: Update of `_sids_by_group` & `_subscriptions` must be
775
759
# atomic i.e. without `awaits` in between.
776
- waitlist = []
777
760
for group in groups :
778
761
self ._sids_by_group .setdefault (group , []).append (operation_id )
779
- waitlist .append (self ._channel_layer .group_add (group , self .channel_name ))
780
- notifier_task = self ._spawn_background_task (notifier ())
762
+ notifier_task = self ._spawn_background_task (notifier (observer ))
781
763
self ._subscriptions [operation_id ] = self ._SubInf (
782
764
groups = groups ,
783
765
sid = operation_id ,
@@ -786,9 +768,20 @@ def workload():
786
768
notifier_task = notifier_task ,
787
769
)
788
770
789
- await asyncio .wait (waitlist )
771
+ await asyncio .wait (
772
+ [
773
+ self ._channel_layer .group_add (group , self .channel_name )
774
+ for group in groups
775
+ ]
776
+ )
790
777
791
- return stream
778
+ # Enqueue the `publish` method execution. But do not notify
779
+ # clients when `publish` returns `SKIP`.
780
+ return (
781
+ rx .Observable .create (push_payloads ) # pylint: disable=no-member
782
+ .map (publish_callback )
783
+ .filter (lambda publish_returned : publish_returned is not self .SKIP )
784
+ )
792
785
793
786
async def _on_gql_stop (self , operation_id ):
794
787
"""Process the STOP message.
0 commit comments