-
Notifications
You must be signed in to change notification settings - Fork 4
Home
==Implementing patterns== All queue operations are to be performed by superuser. For giving non superuser access to queue or topic please use {{{mbus.queue_acl}}} function, see [nonsuperuser Using_views_for_non_superuser_access] for details.
Since we suppose to have hstore in public: {{{ SET search_path = public, mbus; }}}
===Definitions===
- Application - client program which uses messaging
- Message - unified format to send and receive data through queues. Could be described by table {{{qt_model}}} in schema {{{mbus}}}:
Headers could be:
- seenby
- source_db
- destination_queue
- enqueue_time
column | datatype | description | ||||
id | integer | Internal only id, is not to be used by application | ||||
added | timestamp without time zone | When message was added | ||||
iid | text | Unique message id. Supposed to be unique among all queues and databases participating in communication | ||||
delayed_until | timestamp without time zone | Message is not to be consumed until this timestamp | ||||
expires | timestamp without time zone | Message will be not consumed after this timestamp | ||||
received | integer[] | Array of consumer ids, by whose this message was consumed | ||||
headers | hstore | Message headers, assumed for internal usage only, not for application usage. | ||||
properties | hstore | Various properties which are to be set by application (message headers fro application level). | ||||
data | hstore | Message payload in hstore format |
- Payload - any hstore
- Channel - or Message channel is a term used to designate an abstraction: application can write data to a particular channel and read data from it. In this terms Message channel is something like logical address for message routing. Exact queue is a message channel in mbus.
- Queue
- Topic
- Consumer
- Selector
- Publisher
- Subscriber
===Queue===
{{{ mbus.create_queue(qname, ncons) }}} Creates queue.
- {{{qname text}}} - queue name, allowed {{{[a-z], _, 0-9}}} only
- {{{ncons integer}}} - queue parts available for simultaneous scanning (= number of consumers). Could be from 2 to 128-256, larger numbers could cause problems with performance.
{{{ mbus.post_<qname>( data hstore, headers hstore DEFAULT NULL::hstore, properties hstore DEFAULT NULL::hstore, delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone, expires timestamp without time zone DEFAULT NULL::timestamp without time zone) }}} Posts message to the queue. Returns message id;
- {{{data hstore}}} - message payload in hstore format
- {{{headers hstore}}} - message headers, assumed for internal usage only, not for application usage.
- {{{properties hstore}}} - various properties which are to be set by application (message headers fro application level).
- {{{delayed_until timestamp without time zone}}} - message is not to be consumed until this timestamp.
- {{{expires timestamp without time zone}}} - message will be not consumed after this timestamp. To clean up queue use {{{clear_queue_<qname>()}}}
{{{ mbus.post( qname text, data hstore, headers hstore DEFAULT NULL::hstore, properties hstore DEFAULT NULL::hstore, delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone, expires timestamp without time zone DEFAULT NULL::timestamp without time zone) }}} Same as {{{post_<qname>}}}. The only difference is passing {{{qname}}} as a parameter;
{{{ mbus.consume(qname text) }}} Consumes one message from {{{qname}}}. Returns result set {{{mbus.qt_model}}}. See Message section in [#Definitions] for {{{qt_model}}} description.
==Publish/Subscribe==
In current version only durable subscribers are available. Queue cold be turn into topic by creating one or more subscribers. {{{ mbus.create_consumer(cname text, qname text, p_selector text DEFAULT NULL::text, noindex boolean DEFAULT false) }}} Creates consumer with {{{cname}}} for queue {{{qname}}} using selector {{{p_selector}}}. Take in mind, that consumer with name {{{default}}} was created by {{{mbus.create_queue(qname, ncons)}}}. Selector could be any static expression like {{{$$(properties->’FLAG’)=’true’$$}}}.
{{{ mbus.consume(qname, cname) }}} Consumes one message from {{{qname}}} by consumer {{{cname}}}. Returns result set {{{mbus.qt_model}}}.
{{{ mbus.consume_<qname>_by_<cname>() }}} Same as {{{mbus.consume(qname, cname)}}}
{{{ mbus.consumen_<qname>_by_<cname>(cnt integer) }}} Consumes {{{cnt}}} messages from {{{qname}}} by consumer {{{cname}}}. Returns result set {{{mbus.qt_model}}}.
Complete pub/sub example: {{{ mbt=# SELECT create_queue(‘my_topic’, 2); NOTICE: CREATE TABLE / UNIQUE will create implicit index “qt$my_topic_iid_key” for table “qt$my_topic” CONTEXT: SQL statement “create table mbus.qt$my_topic( like mbus.qt_model including all)” PL/pgSQL function “create_queue” line 8 at EXECUTE statement create_queue
(1 row)
mbt=# SELECT create_consumer(‘flag_true’, ‘my_topic’,
(1 row)
mbt=# SELECT create_consumer(‘flag_false’, ‘my_topic’,
(1 row)
mbt=# SELECT post_my_topic(‘“data”=>”test payload”’::hstore, NULL::hstore, ‘“FLAG”=>”true”’::hstore, NULL::timestamp, null::timestamp); post_my_topic
mbt.27.1582.909b4d008e013255d7a180945ab89e99 (1 row)
mbt=# SELECT post_my_topic(‘“data”=>”test payload”’::hstore, NULL::hstore, ‘“FLAG”=>”false”’::hstore, NULL::timestamp, null::timestamp); post_my_topic
mbt.28.1583.909b4d008e013255d7a180945ab89e99 (1 row)
mbt=# SELECT post_my_topic(‘“data”=>”test payload”’::hstore, NULL::hstore, ‘“FLAG”=>”true”’::hstore, NULL::timestamp, null::timestamp); post_my_topic
mbt.29.1585.909b4d008e013255d7a180945ab89e99 (1 row)
mbt=# \x Expanded display is on. mbt=# SELECT consume(‘my_topic’,’flag_true’); -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ consume | (27,”2012-12-26 18:15:06.961829”,mbt.27.1582.909b4d008e013255d7a180945ab89e99,”2012-12-26 17:15:06.961829”,,{},”“”seenby”“=>”“{mbt}”“, “”source_db”“=>”“mbt”“, “”destination”“=>”“my_topic”“, “”enqueue_time”“=>”“2012-12-26 18:15:06.961829”“, “”destination_queue”“=>”“my_topic”“”,”“”FLAG”“=>”“true”“”,”“”data”“=>”“test payload”“”)
mbt=# SELECT consume(‘my_topic’,’flag_true’); -[ RECORD 1 ]--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- consume | (29,”2012-12-26 18:16:13.79412”,mbt.29.1585.909b4d008e013255d7a180945ab89e99,”2012-12-26 17:16:13.79412”,,{},”“”seenby”“=>”“{mbt}”“, “”source_db”“=>”“mbt”“, “”destination”“=>”“my_topic”“, “”enqueue_time”“=>”“2012-12-26 18:16:13.79412”“, “”destination_queue”“=>”“my_topic”“”,”“”FLAG”“=>”“true”“”,”“”data”“=>”“test payload”“”)
mbt=# SELECT consume(‘my_topic’,’flag_true’); (No rows) mbt=# SELECT consume(‘my_topic’,’flag_false’); -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- consume | (28,”2012-12-26 18:15:20.305775”,mbt.28.1583.909b4d008e013255d7a180945ab89e99,”2012-12-26 17:15:20.305775”,,{},”“”seenby”“=>”“{mbt}”“, “”source_db”“=>”“mbt”“, “”destination”“=>”“my_topic”“, “”enqueue_time”“=>”“2012-12-26 18:15:20.305775”“, “”destination_queue”“=>”“my_topic”“”,”“”FLAG”“=>”“false”“”,”“”data”“=>”“test payload”“”)
mbt=# SELECT consume(‘my_topic’,’flag_false’); (No rows) }}}