Skip to content
Kristian Ionescu edited this page Nov 6, 2013 · 44 revisions

Webmachine/Mochiweb

Webmachine is an Restful API toolkit written in erlang, that is built on top of the excellent bit-pushing and HTTP syntax-management provided by mochiweb. Webmachine makes it easier for us to integrate a RESTful API to sensor-cloud.

Resource

An application in webmachine is called a resource. The resource contains the source code of your application and together with webmachines API, you can modify the behavior of your application based on HTTP method requests but also on other HTTP options. Webmachine provides a toolkit of functions that could be used in the resource, more specified documentation about the functions can be found here.

Dispatcher

A webmachine application uses URI dispatching to resources. This makes it possible to distribute multiple URI paths to different resources in webmachine. The dispatch configuration is done on priv/dispatch.conf. The dispatch map data structure is a list of tuples.
The tuple is a 3-tuples of the form {pathspec, resource, args}.

  • pathspec specifies the uri path that matches the URI for a request.
  • resource is the name of the webmachine resource that is supposed to handle HTTP requests to the corresponding pathspec.
  • args is a list of arguments that could be passed to the resource upon execution, in most of the cases it is not needed.

Example

    {["streams", 'id'], streams, []}.
    {["streams"], streams, []}.
    {["resources"], resource, []}.

In this example we can see how different path specifications use different/same resources. HTTP requests made to srv:8000/streams will be handled by the streams.erl resource while requests made to srv:8000/resources will be handled by resource.erl. The first row shows how we can use uri information in our resources. A request could be made to srv:8000/streams/1 and the value of id can be extracted by the resource streams using the webmachine API resource function wrq:path_info which would give us a list of tuples that are key-value paired, in this case we would get [{id::atom, "1"::string}].

Configuration

Reference

  • Webmachine GitHub Repository can be found here.

Data store

We are using elasticsearch for our datastore. Elasticsearch works like a document datastore that you can connect to with a RESTful API. To create/index new documents in elasticsearch, you do a POST request with a json object. This also means that when GETting a document you will get a json object. All entities in our system are stored as separate documents. The relations are not handled by elasticsearch in any way. The entity simply has the id of the "owner", for example a stream has the id of the resource that it belongs to.

Elasticsearch

A "flexible and powerful open source, distributed real-time search and analytics engine for the cloud" - http://www.elasticsearch.org/

Elasticsearch is running with the default options. This means you can do http requests to it at port 9200. See Setup for installation instructions.

Configuration of our elasticsearch server can be found here: https://github.com/projectcs13/elasticsearch/blob/configured/config/elasticsearch.yml

Basically, the only thing we have changed is the cluster name.

Entities

The entities that exist within our system. Each of these entities is a special type of Elasticsearch document.

Users

Resources

Fields currently in a resource:

  • user_id
  • name
  • tags
  • description
  • type
  • manufacturer
  • streams
  • uri
  • polling_freq
  • creation_date

Streams

A stream is a representation of a stream of data. A stream has meta-data associated with it. The actual data is saved as individual datapoints.

Fields currently in a stream:

  • resource_id
  • name
  • tags
  • description
  • private
  • type
  • accuracy
  • min_val
  • max_val
  • quality
  • active
  • user_ranking
  • subscribers
  • last_updated
  • creation_date
  • history_size
  • location

Virtual Streams

Fields currently in a virtual stream:

  • name
  • description
  • tags
  • group
  • private
  • history_size
  • last_updated
  • creation_date
  • function

Datapoints

Fields currently in a datapoint:

  • timestamp
  • value
  • stream_id

Groups

Polling system

Publisher/Subscriber system

  1. Concepts
  2. Overview
  3. Clients
  4. Program Structure
  5. How to publish
  6. How to subscribe
  7. How to use a web-socket
  8. RabbitMQ

##Concepts Top

  • Streams. A stream is a flow of data. A user can subscribe to a stream.
  • Virtual Streams. A virtual stream subscribes to one or several streams/virtual streams and process incoming data according to a defined function, which could be a prediction or some other aggregation of data. A user can subscribe to a virtual stream.
  • Prediction. Currently, the predictions are calculated using R.
  • Data points. A data point is a data value for a stream/virtual stream. When the system receives a data point from a external resources in Json format, the system parse it and transform it to a data point.

Overview

Top
When a new data point arrives for a stream, it is published in the pub/sub system and distributed to all clients who have subscribed to the stream. The system also supports dynamic visualization and prediction. RabbitMQ has been utilized to implement the back-end messaging. Node.js and Socket.IO are used to interact with the web pages via web-sockets.

##Clients Top

  • Web pages Pub/Sub system could push data to webpages which enable dynamic visualization.
  • Sessions Inform a logged on user about their subscriptions.
  • Users Inform logged of users about their subscriptions.

##Program Structure Top

  • virtualStreamProcess.erl The implementation of the virtual stream, it may contain the prediction functionality and its local data store in the future. Also the virtual stream may do some data processing in the final product.
  • resourceProcessMock.erl Test replacement for the coming pollers and parsers, this is a process which generate some random data and push them into the pub/sub system so we could test our code.
  • streamProcess.erl Implementation of a stream process. Each stream process is associated with one stream and handles to push data both to the DB and to the pub/sub system.
  • datapoints_resource.erl handles a data point when provided via the API.

How to publish

Top
When a data point is published it is published to an exchange and sent to the queues in a specified manner, this is an example of doing a fanout in an Erlang module:

    %% Connect
    {ok, Connection} =
            amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
    %% Open channel
    {ok, ChannelOut} = amqp_connection:open_channel(Connection),
    %% Declare exchange
    amqp_channel:call(ChannelOut, #'exchange.declare'{exchange = ?EXCHANGE, type = <<"fanout">>}),
    %% Publish data point
    amqp_channel:cast(Channel, #'basic.publish'{exchange = ?EXCHANGE}, #amqp_msg{payload = ?DATAPOINT}).

For further details check the documentation here
Check the documentation here for how to publish via Node.js using Rabbit.js.

How to subscribe

Top
In the system each stream/virtual stream has it own exchange in RabbitMQ and clients subscribe to these exchanges by hooking up queues to them. To subscribe you need to connect to the exchange, this is an example how to do it in an Erlang module:

    %% Connect  
    {ok, Connection} =
            amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
    
    %% Open channel
    {ok, ChannelIn} = amqp_connection:open_channel(Connection),

    %% Declare queue
    amqp_channel:call(ChannelIn, #'exchange.declare'{exchange = ?EXCHANGE, type = ?TYPE}),
    #'queue.declare_ok'{queue = QueueIn} = amqp_channel:call(ChannelIn, #'queue.declare'{exclusive = true}),
    amqp_channel:call(ChannelIn, #'queue.bind'{exchange = ?EXCHANGE, queue = QueueIn}),

    %% Subscribe to INPUT queue
    amqp_channel:subscribe(ChannelIn, #'basic.consume'{queue = QueueIn, no_ack = true}, self()),
    receive
            {#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
                  // Do something with the data
    end.

For further details check the documentation here

How to subscribe via Node.js using Rabbit.js:

    var sub = context.socket('SUB');
    sub.connect(?EXCHANGE);
    sub.on('data', function(data) {
            // Do something with the data
    });

For further details check the documentation here

How to use a web-socket

Top
The system allows subscriptions via web-sockets, which is used to update web pages in real time. To let a webpage connect via a web-socket Socket.IO is used.
Here is an example of how to connect to a web-socket:

    var socket = io.connect(?IP_ADDRESS);
    socket.on(?MSG_TYPE, function(data) {
        // Do something with the data
    });

For further details look in the documentation here
Tip: If you want to connect several clients on different things through one web-socket, consider using namespaces, a functionality provided in Socket.IO.

##RabbitMQ Top
RabbitMQ is a message broker and provides robust messaging for many successful commercial websites, including Twitter. It runs on a majority of operating systems and is easy to use. RabbitMQ offers client`s libraries for many mainstream programming languages, including Erlang which we are using in this project. RabbitMQ is AMQP based protocol and the following are essential concepts:

  • Queues. A queue is used to cache incoming messages and a client could fetch messages from the queue.
  • Exchange. A exchange is where data points arrive and distribute to the connected queues according to some rule.
  • Publisher/Subscriber. A publisher is something that push data into the pub/sub system and a subscriber is something that listens to specific data that comes in to the pub/sub system.

If you would like to know more about RabbitMQ, please visit its official website.

Clone this wiki locally