-
Notifications
You must be signed in to change notification settings - Fork 4
Home
All queue operations are to be performed by superuser. For giving non superuser access to queue or topic please use mbus.queue_acl function, see [nonsuperuser Using_views_for_non_superuser_access] for details.
Since we suppose to have hstore in public:
SET search_path = public, mbus;
- Application - client program which uses messaging
- Message - unified format to send and receive data through queues. Could be described by table qt_model in schema mbus:
column | datatype | description |
id | integer | Internal only id, is not to be used by application |
added | timestamp without time zone | When message was added |
iid | text | Unique message id. Supposed to be unique among all queues and databases participating in communication |
delayed_until | timestamp without time zone | Message is not to be consumed until this timestamp |
expires | timestamp without time zone | Message will be not consumed after this timestamp |
received | integer[] | Array of consumer ids, by whose this message was consumed |
headers | hstore | Message headers, assumed for internal usage only, not for application usage. |
properties | hstore | Various properties which are to be set by application (message headers fro application level). |
data | hstore | Message payload in hstore format |
Headers could be:
- seenby
- source_db
- destination_queue
- enqueue_time
- *Payload* - any hstore
- *Channel* - or *Message channel* is a term used to designate an abstraction: application can write data to a particular channel and read data from it. In this terms Message channel is something like logical address for message routing. Exact queue is a message channel in mbus.
- Queue
- Topic
- Consumer
- Selector
- Publisher
- Subscriber
mbus.create_queue(qname, ncons)
Creates queue.
- qname text - queue name, allowed [a-z], _, 0-9 only
- ncons integer - queue parts available for simultaneous scanning (= number of consumers). Could be from 2 to 128-256, larger numbers could cause problems with performance.
mbus.post_<qname>( data hstore, headers hstore DEFAULT NULL::hstore, properties hstore DEFAULT NULL::hstore, delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone, expires timestamp without time zone DEFAULT NULL::timestamp without time zone)
Posts message to the queue. Returns message id;
- data hstore - message payload in hstore format
- headers hstore - message headers, assumed for internal usage only, not for application usage.
- properties hstore - various properties which are to be set by application (message headers fro application level).
- delayed_until timestamp without time zone - message is not to be consumed until this timestamp.
- expires timestamp without time zone - message will be not consumed after this timestamp. To clean up queue use clear_queue_<qname>()
mbus.post( qname text, data hstore, headers hstore DEFAULT NULL::hstore, properties hstore DEFAULT NULL::hstore, delayed_until timestamp without time zone DEFAULT NULL::timestamp without time zone, expires timestamp without time zone DEFAULT NULL::timestamp without time zone)
Same as post_<qname>. The only difference is passing qname as a parameter;
mbus.consume(qname text)
Consumes one message from qname. Returns result set mbus.qt_model. See Message section in [#Definitions] for qt_model description.
In current version only durable subscribers are available. Queue cold be turn into topic by creating one or more subscribers.
mbus.create_consumer(cname text, qname text, p_selector text DEFAULT NULL::text, noindex boolean DEFAULT false)
Creates consumer with cname for queue qname using selector p_selector. Take in mind, that consumer with name default was created by mbus.create_queue(qname, ncons). Selector could be any static expression like
mbus.consume(qname, cname)
Consumes one message from qname by consumer cname. Returns result set mbus.qt_model.
mbus.consume_<qname>_by_<cname>()
Same as mbus.consume(qname, cname)
mbus.consumen_<qname>_by_<cname>(cnt integer)
Consumes cnt messages from qname by consumer cname. Returns result set mbus.qt_model.
- Complete pub/sub example:*
mbt=# SELECT create_queue('my_topic', 2); NOTICE: CREATE TABLE / UNIQUE will create implicit index "qt$my_topic_iid_key" for table "qt$my_topic" CONTEXT: SQL statement "create table mbus.qt$my_topic( like mbus.qt_model including all)" PL/pgSQL function "create_queue" line 8 at EXECUTE statement create_queue -------------- (1 row) mbt=# SELECT create_consumer('flag_true', 'my_topic', $$(properties->'FLAG')='true'$$); create_consumer ----------------- (1 row) mbt=# SELECT create_consumer('flag_false', 'my_topic', $$(properties->'FLAG')='false'$$); create_consumer ----------------- (1 row) mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"true"'::hstore, NULL::timestamp, null::timestamp); post_my_topic ---------------------------------------------- mbt.27.1582.909b4d008e013255d7a180945ab89e99 (1 row) mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"false"'::hstore, NULL::timestamp, null::timestamp); post_my_topic ---------------------------------------------- mbt.28.1583.909b4d008e013255d7a180945ab89e99 (1 row) mbt=# SELECT post_my_topic('"data"=>"test payload"'::hstore, NULL::hstore, '"FLAG"=>"true"'::hstore, NULL::timestamp, null::timestamp); post_my_topic ---------------------------------------------- mbt.29.1585.909b4d008e013255d7a180945ab89e99 (1 row) mbt=# \x Expanded display is on. mbt=# SELECT consume('my_topic','flag_true'); -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ consume | (27,"2012-12-26 18:15:06.961829",mbt.27.1582.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:15:06.961829",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:15:06.961829"", ""destination_queue""=>""my_topic""","""FLAG""=>""true""","""data""=>""test payload""") mbt=# SELECT consume('my_topic','flag_true'); -[ RECORD 1 ]--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- consume | (29,"2012-12-26 18:16:13.79412",mbt.29.1585.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:16:13.79412",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:16:13.79412"", ""destination_queue""=>""my_topic""","""FLAG""=>""true""","""data""=>""test payload""") mbt=# SELECT consume('my_topic','flag_true'); (No rows) mbt=# SELECT consume('my_topic','flag_false'); -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- consume | (28,"2012-12-26 18:15:20.305775",mbt.28.1583.909b4d008e013255d7a180945ab89e99,"2012-12-26 17:15:20.305775",,{},"""seenby""=>""{mbt}"", ""source_db""=>""mbt"", ""destination""=>""my_topic"", ""enqueue_time""=>""2012-12-26 18:15:20.305775"", ""destination_queue""=>""my_topic""","""FLAG""=>""false""","""data""=>""test payload""") mbt=# SELECT consume('my_topic','flag_false'); (No rows)