Skip to content

Commit

Permalink
Bind a publication context to subscriptions
Browse files Browse the repository at this point in the history
That improves debugging by yielding a second optional argument to
subscription blocks. That argument is an instance of
`Omnes::PublicationContext`, referencing the publisher's location and
the publication time.

```ruby
bus.subscribe(:foo) do |event, publication_context|
  # debugging
  abort publication_context.inspect
end
```

If they want to support it, adapters need to be aware to dispatch the
second argument if present. As such, current adapters have been adapted.
`Omnes::PublicationContext` contains a `#serialized` method that can be
called before dispatching async adapters.
  • Loading branch information
waiting-for-dev committed Apr 15, 2022
1 parent 1ec5225 commit 12f6caa
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 80 deletions.
46 changes: 40 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ Omnes.config.subscriber.adapter.active_job.serializer = :serialized_payload.to_p
#### Custom adapters

Custom adapters can be built. They need to implement a method `#call` taking
the instance of `Omnes::Subscriber` and the event.
the instance of `Omnes::Subscriber`, the event and, optionally, the publication
context (see [debugging subscriptions](#subscription)).

Here's a custom adapter executing a subscriber method in a different
thread (we add an extra argument for the method name, and we partially apply it
Expand All @@ -442,7 +443,6 @@ end

class OrderCreationEmailSubscriber
include Omnes::Subscriber
include Sidekiq::Job

handle :order_created, with: THREAD_ADAPTER.curry[:order_created]

Expand All @@ -453,8 +453,8 @@ end
```

Alternatively, adapters can be curried and only take the instance as an
argument, returning a one-argument callable taking the event. For instance, we
could also have defined the thread adapter like this:
argument, returning a callable taking the event. For instance, we could also
have defined the thread adapter like this:

```ruby
class ThreadAdapter
Expand Down Expand Up @@ -516,10 +516,10 @@ When you publish an event, you get back an
attributes that allow observing what happened:

- `#event` contains the event instance that has been published.
- `#caller_location` refers to the publication caller.
- `#time` is the time stamp for the publication.
- `#executions` contains an array of
`Omnes::Execution`(lib/omnes/execution.rb). Read more below.
- `#context` is an instance of
[`Omnes::PublicationContext`](lib/omnes/publication_context.rb).

`Omnes::Execution` represents a subscription individual execution. It contains
the following attributes:
Expand All @@ -529,6 +529,40 @@ the following attributes:
- `#benchmark` of the operation.
- `#time` is the time where the execution started.

`Omnes::PublicationContext` represents the shared context for all triggered
executions. See [Subscription][#subscription] for details.

### Subscription

If your subscription block or callable object takes a second argument, it'll
contain an instance of an
[`Omnes::PublicationContext`](lib/omnes/publication_context.rb). It allows you
to inspect what triggered a given execution from within that execution code. It
contains:

- `#caller_location` refers to the publication caller.
- `#time` is the time stamp for the publication.

```ruby
class OrderCreationEmailSubscriber
include Omnes::Subscriber

handle :order_created, with: :send_confirmation_email

def send_confirmation_email(event, publication_context)
# debugging
abort(publication_context.caller_location.inspect)

OrderCreationEmail.send(number: event.number, email: event.user_email)
end
end
```

In case you're developing your own async adapter, you can call `#serialized` on
an instance of `Omnes::PublicationContext` to get a serialized version of it.
It'll return a `Hash` with `"caller_location"` and `"time"` keys, and the
respective `String` representations as values.

## Testing

Ideally, you wouldn't need big setups to test your event-driven behavior. You
Expand Down
11 changes: 6 additions & 5 deletions lib/omnes/bus.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "omnes/publication"
require "omnes/publication_context"
require "omnes/registry"
require "omnes/subscription"
require "omnes/unstructured_event"
Expand Down Expand Up @@ -183,13 +184,13 @@ def publish(event, caller_location: caller_locations(cal_loc_start)[0], **payloa
publication_time = Time.now.utc
event = self.class.EventType(event, **payload)
registry.check_event_name(event.omnes_event_name)
executions = execute_subscriptions_for_event(event)
publication_context = PublicationContext.new(caller_location: caller_location, time: publication_time)
executions = execute_subscriptions_for_event(event, publication_context)

Publication.new(
event: event,
executions: executions,
caller_location: caller_location,
time: publication_time
context: publication_context
)
end

Expand Down Expand Up @@ -288,9 +289,9 @@ def subscription(id)

private

def execute_subscriptions_for_event(event)
def execute_subscriptions_for_event(event, publication_context)
subscriptions_for_event(event).map do |subscription|
subscription.(event)
subscription.(event, publication_context)
end
end

Expand Down
19 changes: 5 additions & 14 deletions lib/omnes/publication.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,16 @@ class Publication
# @return [Array<Omnes::Execution>]
attr_reader :executions

# Location for the event caller
# Publication context, shared by all triggered executions
#
# It's usually set by {Omnes::Bus#publish}, and it points to the caller of
# that method.
#
# @return [Thread::Backtrace::Location]
attr_reader :caller_location

# Time of the event publication
#
# @return [Time]
attr_reader :time
# @return [Omnes::PublicationContext]
attr_reader :context

# @api private
def initialize(event:, executions:, caller_location:, time:)
def initialize(event:, executions:, context:)
@event = event
@executions = executions
@caller_location = caller_location
@time = time
@context = context
end
end
end
41 changes: 41 additions & 0 deletions lib/omnes/publication_context.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

module Omnes
# Context for an event publication
#
# An instance of this class is shared between all the executions that are
# triggered by the publication of a given event. It's provided to the
# subscriptions as their second argument when they take it.
#
# This class is useful mainly for debugging and logging purposes.
class PublicationContext
# Location for the event publisher
#
# It's set by {Omnes::Bus#publish}, and it points to the caller of that
# method.
#
# @return [Thread::Backtrace::Location]
attr_reader :caller_location

# Time of the event publication
#
# @return [Time]
attr_reader :time

# @api private
def initialize(caller_location:, time:)
@caller_location = caller_location
@time = time
end

# Serialized version of a publication context
#
# @return Hash<String, String>
def serialized
{
"caller_location" => caller_location.to_s,
"time" => time.to_s
}
end
end
end
12 changes: 8 additions & 4 deletions lib/omnes/subscriber/adapter/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def self.[](serializer: config.serializer)
end

# @api private
def self.call(instance, event)
self.[].(instance, event)
def self.call(instance, event, publication_context)
self.[].(instance, event, publication_context)
end

# @api private
Expand All @@ -64,8 +64,12 @@ def initialize(serializer:)
@serializer = serializer
end

def call(instance, event)
instance.class.perform_later(serializer.(event))
def call(instance, event, publication_context)
if Subscription.takes_publication_context?(instance.method(:perform))
instance.class.perform_later(serializer.(event), publication_context.serialized)
else
instance.class.perform_later(serializer.(event))
end
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/omnes/subscriber/adapter/method.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Subscriber
module Adapter
# Builds a callback from a method of the instance
#
# You can use instance of this class as the adapter:
# You can use an instance of this class as the adapter:
#
# ```ruby
# handle :foo, with: Adapter::Method.new(:foo)
Expand All @@ -29,7 +29,7 @@ def initialize(name)
def call(instance)
check_method(instance)

->(event) { instance.method(name).(event) }
instance.method(name)
end

private
Expand Down
26 changes: 20 additions & 6 deletions lib/omnes/subscriber/adapter/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def self.[](serializer: config.serializer)
end

# @api private
def self.call(instance, event)
self.[].(instance, event)
def self.call(instance, event, publication_context)
self.[].(instance, event, publication_context)
end

# @param seconds [Integer]
Expand All @@ -76,15 +76,29 @@ def initialize(serializer:)
@serializer = serializer
end

def call(instance, event)
instance.class.perform_async(serializer.(event))
def call(instance, event, publication_context)
if takes_publication_context?(instance)
instance.class.perform_async(serializer.(event), publication_context.serialized)
else
instance.class.perform_async(serializer.(event))
end
end

def in(seconds)
lambda do |instance, event|
instance.class.perform_in(seconds, serializer.(event))
lambda do |instance, event, publication_context|
if takes_publication_context?(instance)
instance.class.perform_in(seconds, serializer.(event), publication_context.serialized)
else
instance.class.perform_in(seconds, serializer.(event))
end
end
end

private

def takes_publication_context?(instance)
Subscription.takes_publication_context?(instance.method(:perform))
end
end
end
end
Expand Down
17 changes: 15 additions & 2 deletions lib/omnes/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ class Subscription

ALL_EVENTS_MATCHER = ->(_candidate) { true }

# @api private
def self.random_id
SecureRandom.uuid.to_sym
end

# @api private
def self.takes_publication_context?(callable)
callable.parameters.count == 2
end

# @api private
attr_reader :matcher, :callback, :id

Expand All @@ -41,10 +47,17 @@ def initialize(matcher:, callback:, id:)
end

# @api private
def call(event)
def call(event, publication_context)
result = nil
benchmark = Benchmark.measure do
result = @callback.(event)
# work around Ruby not being able to tell remaining arity for a curried
# function (or uncurrying), because we want to be able to create subscriber
# adapters partially applying the subscriber instance
result = begin
@callback.(event, publication_context)
rescue ArgumentError
@callback.(event)
end
end

Execution.new(subscription: self, result: result, benchmark: benchmark)
Expand Down
42 changes: 36 additions & 6 deletions spec/support/shared_examples/bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,44 @@ def bar
expect(dummy.box).to eq(:bar)
end

it "adds the caller location to the publication result object" do
it "yields the publication context as second parameter for the subscription" do
bus = subject.new
bus.register(:foo)
bus.subscribe(:foo) { :work }
bus.subscribe(:foo) do |_event, publication_context|
expect(publication_context.is_a?(Omnes::PublicationContext)).to be(true)
end

publication = bus.publish :foo
bus.publish(:foo)
end

expect(publication.caller_location.to_s).to include(__FILE__)
it "adds the caller location to the provided publication context" do
bus = subject.new
bus.register(:foo)
bus.subscribe(:foo) do |_event, publication_context|
expect(publication_context.caller_location.to_s).to include(__FILE__)
end

bus.publish(:foo)
end

it "adds the publication time to the publication result object" do
it "adds publication time to the provided publication context" do
bus = subject.new
bus.register(:foo)
bus.subscribe(:foo) do |_event, publication_context|
expect(publication_context.time).not_to be_nil
end

bus.publish(:foo)
end

it "returns a publication instance" do
bus = subject.new
bus.register(:foo)
bus.subscribe(:foo) { :work }

publication = bus.publish :foo

expect(publication.time).not_to be(nil)
expect(publication.is_a?(Omnes::Publication)).to be(true)
end

it "adds the published event to the publication result object" do
Expand Down Expand Up @@ -168,6 +188,16 @@ def bar
expect(executions.map(&:result)).to match([1, 2])
end

it "adds the context to the publication result object" do
bus = subject.new
bus.register(:foo)
bus.subscribe(:foo) { :work }

publication = bus.publish :foo

expect(publication.context.is_a?(Omnes::PublicationContext)).to be(true)
end

it "raises when the published event hasn't been registered" do
bus = subject.new

Expand Down
Loading

0 comments on commit 12f6caa

Please sign in to comment.