Skip to content
plumqqz edited this page Mar 11, 2015 · 9 revisions

Implementing patterns

All queue operations are to be performed by superuser. For giving non superuser access to queue or topic please use mbus.queue_acl function, see [nonsuperuser Using_views_for_non_superuser_access] for details.

Since we suppose to have hstore in public:

SET search_path = public, mbus; 

Definitions

  • Application - client program which uses messaging
  • Message - unified format to send and receive data through queues. Could be described by table qt_model in schema mbus:
column datatype description
id integer Internal only id, is not to be used by application
added timestamp without time zone When message was added
iid text Unique message id. Supposed to be unique among all queues and databases participating in communication
delayed_until timestamp without time zone Message is not to be consumed until this timestamp
expires timestamp without time zone Message will be not consumed after this timestamp
received integer[] Array of consumer ids, by whose this message was consumed
headers hstore Message headers, assumed for internal usage only, not for application usage.
properties hstore Various properties which are to be set by application (message headers fro application level).
data hstore Message payload in hstore format

Headers could be:

  • seenby
  • source_db
  • destination_queue
  • enqueue_time
  • Payload - any hstore
  • Channel - or Message channel is a term used to designate an abstraction: application can write data to a particular channel and read data from it. In this terms Message channel is something like logical address for message routing. Exact queue is a message channel in mbus.
  • Queue
  • Topic
  • Consumer
  • Selector
  • Publisher
  • Subscriber

Queue

mbus.create_queue(qname, ncons)

Creates queue.

  • qname text - queue name, allowed [a-z], _, 0-9 only
  • ncons integer - queue parts available for simultaneous scanning (= number of consumers). Could be from 2 to 128-256, larger numbers could cause problems with performance.
mbus.post_<qname>(
 data hstore, 
 headers hstore DEFAULT NULL::hstore, 
 properties hstore DEFAULT NULL::hstore, 
 delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone,
 expires timestamp without time zone DEFAULT NULL::timestamp without time zone) 

Posts message to the queue. Returns message id;

  • data hstore - message payload in hstore format
  • headers hstore - message headers, assumed for internal usage only, not for application usage.
  • properties hstore - various properties which are to be set by application (message headers for application level).
  • delayed_until timestamp without time zone - message is not to be consumed until this timestamp.
  • expires timestamp without time zone - message will be not consumed after this timestamp. To clean up queue from expired messages use clear_queue_<qname>()
  • iid - id of new message (optional). iid could be obtained by calling of get_iid()
mbus.post(
 qname text,
 data hstore, 
 headers hstore DEFAULT NULL::hstore, 
 properties hstore DEFAULT NULL::hstore, 
 delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone,
 expires timestamp without time zone DEFAULT NULL::timestamp without time zone) 

Same as post_<qname>. The only difference is passing qname as a parameter;

mbus.consume(qname text)

Consumes one message from qname. Returns result set mbus.qt_model. See Message section in [#Definitions] for qt_model description.

mbus.peek_<qname>(msgid text default null)

Checks for existence of message with iid equals to specified msgid; if msgid is null checks for any message. Note: even if this function returns true it is not mean that subsequential call to consume will return something.

Publish/Subscribe

In current only durable subscribers are available. Queue could be turn into topic by creating one or more subscribers.

mbus.create_consumer(cname text, qname text, p_selector text DEFAULT NULL::text, noindex boolean DEFAULT false)

Creates consumer with cname for queue qname using selector p_selector. Take in mind, that consumer with name default was created by mbus.create_queue(qname, ncons). Selector could be any static expression like $$(properties-&gt;'FLAG')='true'$$.

mbus.consume(qname, cname) 

Consumes one message from qname by consumer cname. Returns result set mbus.qt_model.

mbus.consume_<qname>_by_<cname>()

Same as mbus.consume(qname, cname)

mbus.consumen_<qname>_by_<cname>(cnt integer)

Consumes cnt messages from qname by consumer cname. Returns result set mbus.qt_model.

  • Complete pub/sub example:*
mbt=# SELECT create_queue('my_topic', 2);
NOTICE:  CREATE TABLE / UNIQUE will create implicit index "qt$my_topic_iid_key" for table "qt$my_topic"
CONTEXT:  SQL statement "create table mbus.qt$my_topic( like mbus.qt_model including all)"
PL/pgSQL function "create_queue" line 8 at EXECUTE statement
 create_queue 
--------------
 
(1 row)

mbt=# SELECT create_consumer('flag_true', 'my_topic', $$(properties->'FLAG')='true'$$);
 create_consumer 
-----------------
 
(1 row)

mbt=# SELECT create_consumer('flag_false', 'my_topic', $$(properties->'FLAG')='false'$$);
 create_consumer 
-----------------
 
(1 row)

mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"true"'::hstore, NULL::timestamp, null::timestamp);
                post_my_topic                 
----------------------------------------------
 mbt.27.1582.909b4d008e013255d7a180945ab89e99
(1 row)

mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"false"'::hstore, NULL::timestamp, null::timestamp);
                post_my_topic                 
----------------------------------------------
 mbt.28.1583.909b4d008e013255d7a180945ab89e99
(1 row)

mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"true"'::hstore, NULL::timestamp, null::timestamp);
                post_my_topic                 
----------------------------------------------
 mbt.29.1585.909b4d008e013255d7a180945ab89e99
(1 row)

mbt=# \x
Expanded display is on.
mbt=# SELECT consume('my_topic','flag_true');
-[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
consume | (27,"2012-12-26 18:15:06.961829",mbt.27.1582.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:15:06.961829",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:15:06.961829"", ""destination_queue""=>""my_topic""","""FLAG""=>""true""","""data""=>""test payload""")

mbt=# SELECT consume('my_topic','flag_true');
-[ RECORD 1 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
consume | (29,"2012-12-26 18:16:13.79412",mbt.29.1585.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:16:13.79412",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:16:13.79412"", ""destination_queue""=>""my_topic""","""FLAG""=>""true""","""data""=>""test payload""")

mbt=# SELECT consume('my_topic','flag_true');
(No rows)
mbt=# SELECT consume('my_topic','flag_false');
-[ RECORD 1 ]-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
consume | (28,"2012-12-26 18:15:20.305775",mbt.28.1583.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:15:20.305775",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:15:20.305775"", ""destination_queue""=>""my_topic""","""FLAG""=>""false""","""data""=>""test payload""")

mbt=# SELECT consume('my_topic','flag_false');
(No rows)

Temporary queues

mbus.create_temporary_queue()

Creates temporary queue and returns its name. It can be used as ordinal queue with same functions (post, consume etc.)

Clone this wiki locally