From 12f6caa53cc80174862dac5701c12be300159bd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Busqu=C3=A9?= Date: Thu, 14 Apr 2022 12:59:30 +0200 Subject: [PATCH] Bind a publication context to subscriptions That improves debugging by yielding a second optional argument to subscription blocks. That argument is an instance of `Omnes::PublicationContext`, referencing the publisher's location and the publication time. ```ruby bus.subscribe(:foo) do |event, publication_context| # debugging abort publication_context.inspect end ``` If they want to support it, adapters need to be aware to dispatch the second argument if present. As such, current adapters have been adapted. `Omnes::PublicationContext` contains a `#serialized` method that can be called before dispatching async adapters. --- README.md | 46 +++++++- lib/omnes/bus.rb | 11 +- lib/omnes/publication.rb | 19 +--- lib/omnes/publication_context.rb | 41 +++++++ lib/omnes/subscriber/adapter/active_job.rb | 12 +- lib/omnes/subscriber/adapter/method.rb | 4 +- lib/omnes/subscriber/adapter/sidekiq.rb | 26 ++++- lib/omnes/subscription.rb | 17 ++- spec/support/shared_examples/bus.rb | 42 ++++++- spec/unit/omnes/publication_context_spec.rb | 20 ++++ .../subscriber/adapter/active_job_spec.rb | 23 ++++ .../omnes/subscriber/adapter/method_spec.rb | 107 +++++++++++++----- .../omnes/subscriber/adapter/sidekiq_spec.rb | 46 ++++++++ spec/unit/omnes/subscription_spec.rb | 32 ++++-- 14 files changed, 366 insertions(+), 80 deletions(-) create mode 100644 lib/omnes/publication_context.rb create mode 100644 spec/unit/omnes/publication_context_spec.rb diff --git a/README.md b/README.md index 1a18389..b708c30 100644 --- a/README.md +++ b/README.md @@ -429,7 +429,8 @@ Omnes.config.subscriber.adapter.active_job.serializer = :serialized_payload.to_p #### Custom adapters Custom adapters can be built. They need to implement a method `#call` taking -the instance of `Omnes::Subscriber` and the event. +the instance of `Omnes::Subscriber`, the event and, optionally, the publication +context (see [debugging subscriptions](#subscription)). Here's a custom adapter executing a subscriber method in a different thread (we add an extra argument for the method name, and we partially apply it @@ -442,7 +443,6 @@ end class OrderCreationEmailSubscriber include Omnes::Subscriber - include Sidekiq::Job handle :order_created, with: THREAD_ADAPTER.curry[:order_created] @@ -453,8 +453,8 @@ end ``` Alternatively, adapters can be curried and only take the instance as an -argument, returning a one-argument callable taking the event. For instance, we -could also have defined the thread adapter like this: +argument, returning a callable taking the event. For instance, we could also +have defined the thread adapter like this: ```ruby class ThreadAdapter @@ -516,10 +516,10 @@ When you publish an event, you get back an attributes that allow observing what happened: - `#event` contains the event instance that has been published. -- `#caller_location` refers to the publication caller. -- `#time` is the time stamp for the publication. - `#executions` contains an array of `Omnes::Execution`(lib/omnes/execution.rb). Read more below. +- `#context` is an instance of + [`Omnes::PublicationContext`](lib/omnes/publication_context.rb). `Omnes::Execution` represents a subscription individual execution. It contains the following attributes: @@ -529,6 +529,40 @@ the following attributes: - `#benchmark` of the operation. - `#time` is the time where the execution started. +`Omnes::PublicationContext` represents the shared context for all triggered +executions. See [Subscription][#subscription] for details. + +### Subscription + +If your subscription block or callable object takes a second argument, it'll +contain an instance of an +[`Omnes::PublicationContext`](lib/omnes/publication_context.rb). It allows you +to inspect what triggered a given execution from within that execution code. It +contains: + +- `#caller_location` refers to the publication caller. +- `#time` is the time stamp for the publication. + +```ruby +class OrderCreationEmailSubscriber + include Omnes::Subscriber + + handle :order_created, with: :send_confirmation_email + + def send_confirmation_email(event, publication_context) + # debugging + abort(publication_context.caller_location.inspect) + + OrderCreationEmail.send(number: event.number, email: event.user_email) + end +end +``` + +In case you're developing your own async adapter, you can call `#serialized` on +an instance of `Omnes::PublicationContext` to get a serialized version of it. +It'll return a `Hash` with `"caller_location"` and `"time"` keys, and the +respective `String` representations as values. + ## Testing Ideally, you wouldn't need big setups to test your event-driven behavior. You diff --git a/lib/omnes/bus.rb b/lib/omnes/bus.rb index 133d111..2233f5c 100644 --- a/lib/omnes/bus.rb +++ b/lib/omnes/bus.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "omnes/publication" +require "omnes/publication_context" require "omnes/registry" require "omnes/subscription" require "omnes/unstructured_event" @@ -183,13 +184,13 @@ def publish(event, caller_location: caller_locations(cal_loc_start)[0], **payloa publication_time = Time.now.utc event = self.class.EventType(event, **payload) registry.check_event_name(event.omnes_event_name) - executions = execute_subscriptions_for_event(event) + publication_context = PublicationContext.new(caller_location: caller_location, time: publication_time) + executions = execute_subscriptions_for_event(event, publication_context) Publication.new( event: event, executions: executions, - caller_location: caller_location, - time: publication_time + context: publication_context ) end @@ -288,9 +289,9 @@ def subscription(id) private - def execute_subscriptions_for_event(event) + def execute_subscriptions_for_event(event, publication_context) subscriptions_for_event(event).map do |subscription| - subscription.(event) + subscription.(event, publication_context) end end diff --git a/lib/omnes/publication.rb b/lib/omnes/publication.rb index 4056283..ac96b84 100644 --- a/lib/omnes/publication.rb +++ b/lib/omnes/publication.rb @@ -19,25 +19,16 @@ class Publication # @return [Array] attr_reader :executions - # Location for the event caller + # Publication context, shared by all triggered executions # - # It's usually set by {Omnes::Bus#publish}, and it points to the caller of - # that method. - # - # @return [Thread::Backtrace::Location] - attr_reader :caller_location - - # Time of the event publication - # - # @return [Time] - attr_reader :time + # @return [Omnes::PublicationContext] + attr_reader :context # @api private - def initialize(event:, executions:, caller_location:, time:) + def initialize(event:, executions:, context:) @event = event @executions = executions - @caller_location = caller_location - @time = time + @context = context end end end diff --git a/lib/omnes/publication_context.rb b/lib/omnes/publication_context.rb new file mode 100644 index 0000000..0a7c1ec --- /dev/null +++ b/lib/omnes/publication_context.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Omnes + # Context for an event publication + # + # An instance of this class is shared between all the executions that are + # triggered by the publication of a given event. It's provided to the + # subscriptions as their second argument when they take it. + # + # This class is useful mainly for debugging and logging purposes. + class PublicationContext + # Location for the event publisher + # + # It's set by {Omnes::Bus#publish}, and it points to the caller of that + # method. + # + # @return [Thread::Backtrace::Location] + attr_reader :caller_location + + # Time of the event publication + # + # @return [Time] + attr_reader :time + + # @api private + def initialize(caller_location:, time:) + @caller_location = caller_location + @time = time + end + + # Serialized version of a publication context + # + # @return Hash + def serialized + { + "caller_location" => caller_location.to_s, + "time" => time.to_s + } + end + end +end diff --git a/lib/omnes/subscriber/adapter/active_job.rb b/lib/omnes/subscriber/adapter/active_job.rb index 7651cd3..b10c2c6 100644 --- a/lib/omnes/subscriber/adapter/active_job.rb +++ b/lib/omnes/subscriber/adapter/active_job.rb @@ -52,8 +52,8 @@ def self.[](serializer: config.serializer) end # @api private - def self.call(instance, event) - self.[].(instance, event) + def self.call(instance, event, publication_context) + self.[].(instance, event, publication_context) end # @api private @@ -64,8 +64,12 @@ def initialize(serializer:) @serializer = serializer end - def call(instance, event) - instance.class.perform_later(serializer.(event)) + def call(instance, event, publication_context) + if Subscription.takes_publication_context?(instance.method(:perform)) + instance.class.perform_later(serializer.(event), publication_context.serialized) + else + instance.class.perform_later(serializer.(event)) + end end end end diff --git a/lib/omnes/subscriber/adapter/method.rb b/lib/omnes/subscriber/adapter/method.rb index cb1e08e..0b9a520 100644 --- a/lib/omnes/subscriber/adapter/method.rb +++ b/lib/omnes/subscriber/adapter/method.rb @@ -7,7 +7,7 @@ module Subscriber module Adapter # Builds a callback from a method of the instance # - # You can use instance of this class as the adapter: + # You can use an instance of this class as the adapter: # # ```ruby # handle :foo, with: Adapter::Method.new(:foo) @@ -29,7 +29,7 @@ def initialize(name) def call(instance) check_method(instance) - ->(event) { instance.method(name).(event) } + instance.method(name) end private diff --git a/lib/omnes/subscriber/adapter/sidekiq.rb b/lib/omnes/subscriber/adapter/sidekiq.rb index 7ecd828..3b50cf7 100644 --- a/lib/omnes/subscriber/adapter/sidekiq.rb +++ b/lib/omnes/subscriber/adapter/sidekiq.rb @@ -59,8 +59,8 @@ def self.[](serializer: config.serializer) end # @api private - def self.call(instance, event) - self.[].(instance, event) + def self.call(instance, event, publication_context) + self.[].(instance, event, publication_context) end # @param seconds [Integer] @@ -76,15 +76,29 @@ def initialize(serializer:) @serializer = serializer end - def call(instance, event) - instance.class.perform_async(serializer.(event)) + def call(instance, event, publication_context) + if takes_publication_context?(instance) + instance.class.perform_async(serializer.(event), publication_context.serialized) + else + instance.class.perform_async(serializer.(event)) + end end def in(seconds) - lambda do |instance, event| - instance.class.perform_in(seconds, serializer.(event)) + lambda do |instance, event, publication_context| + if takes_publication_context?(instance) + instance.class.perform_in(seconds, serializer.(event), publication_context.serialized) + else + instance.class.perform_in(seconds, serializer.(event)) + end end end + + private + + def takes_publication_context?(instance) + Subscription.takes_publication_context?(instance.method(:perform)) + end end end end diff --git a/lib/omnes/subscription.rb b/lib/omnes/subscription.rb index 57508ab..9fc1a9e 100644 --- a/lib/omnes/subscription.rb +++ b/lib/omnes/subscription.rb @@ -24,10 +24,16 @@ class Subscription ALL_EVENTS_MATCHER = ->(_candidate) { true } + # @api private def self.random_id SecureRandom.uuid.to_sym end + # @api private + def self.takes_publication_context?(callable) + callable.parameters.count == 2 + end + # @api private attr_reader :matcher, :callback, :id @@ -41,10 +47,17 @@ def initialize(matcher:, callback:, id:) end # @api private - def call(event) + def call(event, publication_context) result = nil benchmark = Benchmark.measure do - result = @callback.(event) + # work around Ruby not being able to tell remaining arity for a curried + # function (or uncurrying), because we want to be able to create subscriber + # adapters partially applying the subscriber instance + result = begin + @callback.(event, publication_context) + rescue ArgumentError + @callback.(event) + end end Execution.new(subscription: self, result: result, benchmark: benchmark) diff --git a/spec/support/shared_examples/bus.rb b/spec/support/shared_examples/bus.rb index c22e6b7..afe664e 100644 --- a/spec/support/shared_examples/bus.rb +++ b/spec/support/shared_examples/bus.rb @@ -123,24 +123,44 @@ def bar expect(dummy.box).to eq(:bar) end - it "adds the caller location to the publication result object" do + it "yields the publication context as second parameter for the subscription" do bus = subject.new bus.register(:foo) - bus.subscribe(:foo) { :work } + bus.subscribe(:foo) do |_event, publication_context| + expect(publication_context.is_a?(Omnes::PublicationContext)).to be(true) + end - publication = bus.publish :foo + bus.publish(:foo) + end - expect(publication.caller_location.to_s).to include(__FILE__) + it "adds the caller location to the provided publication context" do + bus = subject.new + bus.register(:foo) + bus.subscribe(:foo) do |_event, publication_context| + expect(publication_context.caller_location.to_s).to include(__FILE__) + end + + bus.publish(:foo) end - it "adds the publication time to the publication result object" do + it "adds publication time to the provided publication context" do + bus = subject.new + bus.register(:foo) + bus.subscribe(:foo) do |_event, publication_context| + expect(publication_context.time).not_to be_nil + end + + bus.publish(:foo) + end + + it "returns a publication instance" do bus = subject.new bus.register(:foo) bus.subscribe(:foo) { :work } publication = bus.publish :foo - expect(publication.time).not_to be(nil) + expect(publication.is_a?(Omnes::Publication)).to be(true) end it "adds the published event to the publication result object" do @@ -168,6 +188,16 @@ def bar expect(executions.map(&:result)).to match([1, 2]) end + it "adds the context to the publication result object" do + bus = subject.new + bus.register(:foo) + bus.subscribe(:foo) { :work } + + publication = bus.publish :foo + + expect(publication.context.is_a?(Omnes::PublicationContext)).to be(true) + end + it "raises when the published event hasn't been registered" do bus = subject.new diff --git a/spec/unit/omnes/publication_context_spec.rb b/spec/unit/omnes/publication_context_spec.rb new file mode 100644 index 0000000..df16967 --- /dev/null +++ b/spec/unit/omnes/publication_context_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require "spec_helper" +require "omnes/publication_context" + +RSpec.describe Omnes::PublicationContext do + describe ".serialized" do + it "serializes caller_location as string" do + context = described_class.new(caller_location: caller_locations(0)[0], time: Time.now) + + expect(context.serialized["caller_location"]).to include(__FILE__) + end + + it "serializes time as string" do + context = described_class.new(caller_location: caller_locations(0)[0], time: Time.new(2022, 10, 10)) + + expect(context.serialized["time"]).to include("2022-10-10") + end + end +end diff --git a/spec/unit/omnes/subscriber/adapter/active_job_spec.rb b/spec/unit/omnes/subscriber/adapter/active_job_spec.rb index c77569c..93690b6 100644 --- a/spec/unit/omnes/subscriber/adapter/active_job_spec.rb +++ b/spec/unit/omnes/subscriber/adapter/active_job_spec.rb @@ -73,4 +73,27 @@ def perform(payload) Object.send(:remove_const, :Subscriber) Object.send(:remove_const, :FOO_TABLE) end + + it "can provide the serialized publication context" do + class Subscriber < ActiveJob::Base + include Omnes::Subscriber + + handle :foo, with: Adapter::ActiveJob + + def perform(_payload, publication_context) + LOG[:publication_context] = publication_context + end + end + LOG = {} + + bus.register(:foo) + Subscriber.new.subscribe_to(bus) + + bus.publish(:foo) + perform_enqueued_jobs + + expect(LOG[:publication_context].is_a?(Hash)).to be(true) + ensure + Object.send(:remove_const, :Subscriber) + end end diff --git a/spec/unit/omnes/subscriber/adapter/method_spec.rb b/spec/unit/omnes/subscriber/adapter/method_spec.rb index 5f43737..af47d2d 100644 --- a/spec/unit/omnes/subscriber/adapter/method_spec.rb +++ b/spec/unit/omnes/subscriber/adapter/method_spec.rb @@ -4,41 +4,92 @@ require "omnes/subscriber/adapter/method" RSpec.describe Omnes::Subscriber::Adapter::Method do - describe "#call" do - it "returns lambda that calls method with given event" do - instance = Class.new do - def foo(event) - event - end - end.new + let(:subscriber_class) do + Class.new do + include Omnes::Subscriber - callback = described_class.new(:foo).(instance) + attr_reader :value - expect(callback.(:bar)).to be(:bar) + def initialize + @value = nil + end end + end + let(:bus) { Omnes::Bus.new } + + it "uses given method as handler" do + subscriber_class.class_eval do + include Omnes::Subscriber + + handle :foo, with: :foo + + def foo(event) + @value = event[:value] + end + end + + bus.register(:foo) + subscriber = subscriber_class.new + subscriber.subscribe_to(bus) + bus.publish(:foo, value: :bar) + + expect(subscriber.value).to be(:bar) + end + + it "provides publication context if the method takes a second parameter" do + subscriber_class.class_eval do + include Omnes::Subscriber + + handle :foo, with: :foo + + def foo(_event, publication_context) + @value = publication_context + end + end + + bus.register(:foo) + subscriber = subscriber_class.new + subscriber.subscribe_to(bus) + bus.publish(:foo, value: :bar) + + expect(subscriber.value.is_a?(Omnes::PublicationContext)).to be(true) + end + + it "raises when method is private" do + subscriber_class.class_eval do + include Omnes::Subscriber + + handle :foo, with: :foo - it "raises when method is private" do - instance = Class.new do - private def foo; end - end.new - - expect { - described_class.new(:foo).(instance) - }.to raise_error( - described_class::PrivateMethodSubscriptionAttemptError, - /"foo" private method/m - ) + private def foo(_event); end end - it "raises when method doesn't exist" do - instance = Class.new.new + bus.register(:foo) + subscriber = subscriber_class.new - expect { - described_class.new(:foo).(instance) - }.to raise_error( - described_class::UnknownMethodSubscriptionAttemptError, - /"foo" method/m - ) + expect { + subscriber.subscribe_to(bus) + }.to raise_error( + described_class::PrivateMethodSubscriptionAttemptError, + /"foo" private method/m + ) + end + + it "raises when method doesn't exist" do + subscriber_class.class_eval do + include Omnes::Subscriber + + handle :foo, with: :foo end + + bus.register(:foo) + subscriber = subscriber_class.new + + expect { + subscriber.subscribe_to(bus) + }.to raise_error( + described_class::UnknownMethodSubscriptionAttemptError, + /"foo" method/m + ) end end diff --git a/spec/unit/omnes/subscriber/adapter/sidekiq_spec.rb b/spec/unit/omnes/subscriber/adapter/sidekiq_spec.rb index 2984c34..d3eb7b3 100644 --- a/spec/unit/omnes/subscriber/adapter/sidekiq_spec.rb +++ b/spec/unit/omnes/subscriber/adapter/sidekiq_spec.rb @@ -71,6 +71,29 @@ def perform(payload) Object.send(:remove_const, :FOO_TABLE) end + it "can provide the serialized publication context" do + class Subscriber + include Omnes::Subscriber + include Sidekiq::Job + + handle :foo, with: Adapter::Sidekiq + + def perform(_payload, publication_context) + LOG[:publication_context] = publication_context + end + end + LOG = {} + + bus.register(:foo) + Subscriber.new.subscribe_to(bus) + + bus.publish(:foo) + + expect(LOG[:publication_context].is_a?(Hash)).to be(true) + ensure + Object.send(:remove_const, :Subscriber) + end + it "performs the job in given interval after the publication passing the event's payload" do class Subscriber include Sidekiq::Job @@ -96,4 +119,27 @@ def perform(payload) Object.send(:remove_const, :FOO_TABLE) Object.send(:remove_const, :Subscriber) end + + it "can provide the serialized publication context when giving an interval" do + class Subscriber + include Omnes::Subscriber + include Sidekiq::Job + + handle :foo, with: Adapter::Sidekiq.in(60) + + def perform(_payload, publication_context) + LOG[:publication_context] = publication_context + end + end + LOG = {} + + bus.register(:foo) + Subscriber.new.subscribe_to(bus) + + bus.publish(:foo) + + expect(LOG[:publication_context].is_a?(Hash)).to be(true) + ensure + Object.send(:remove_const, :Subscriber) + end end diff --git a/spec/unit/omnes/subscription_spec.rb b/spec/unit/omnes/subscription_spec.rb index 5d4ee1f..9558242 100644 --- a/spec/unit/omnes/subscription_spec.rb +++ b/spec/unit/omnes/subscription_spec.rb @@ -42,24 +42,42 @@ end describe "#call" do + it "binds the event as the first subscription parameter" do + callback = ->(event) { expect(event).to be(:event) } + + subscription = described_class.new(matcher: true_matcher, callback: callback, id: :id) + + subscription.(:event, :context) + end + + it "binds the publication context when subscription accepts a second argument" do + callback = ->(_event, context) { expect(context).to be(:context) } + + subscription = described_class.new(matcher: true_matcher, callback: callback, id: :id) + + subscription.(:event, :context) + end + it "returns an execution instance" do subscription = described_class.new(matcher: true_matcher, callback: proc {}, id: :id) - expect(subscription.(:event)).to be_a(Omnes::Execution) + expect(subscription.(:event, :context)).to be_a(Omnes::Execution) end - it "binds the event and sets execution's result" do - subscription = described_class.new(matcher: true_matcher, callback: ->(event) { event[:foo] }, id: :id) + it "sets the execution result" do + callback = ->(event) { event } + + subscription = described_class.new(matcher: true_matcher, callback: callback, id: :id) - execution = subscription.(foo: :bar) + execution = subscription.(:event, :context) - expect(execution.result).to eq(:bar) + expect(execution.result).to be(:event) end it "sets itself as the execution subscription" do subscription = described_class.new(matcher: true_matcher, callback: proc { "foo" }, id: :id) - execution = subscription.(:event) + execution = subscription.(:event, :context) expect(execution.subscription).to be(subscription) end @@ -67,7 +85,7 @@ it "sets the execution's benchmark" do subscription = described_class.new(matcher: true_matcher, callback: proc { "foo" }, id: :id) - execution = subscription.(:event) + execution = subscription.(:event, :context) expect(execution.benchmark).to be_a(Benchmark::Tms) end