Skip to content

Commit

Permalink
Version 0.2.0 (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
hartfordfive authored Nov 30, 2016
1 parent 4ebfa6c commit 1ad50c0
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 32 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Changelog

0.1.0
-----
* First initial beta release.

0.2.0
-----
* Added configuration option `json_document_type_schema` which gives ability to optionally enable and enforce a JSON schema validation for JSON format messages.
* Added option `enable_syslog_format_only`, which gives the ability to run process in a mode that only accepts syslog type messages. This could be used as a logging replacement for processes that log to local syslog address.
48 changes: 47 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,52 @@ Ensure that this folder is at the following location:
### Requirements

* [Golang](https://golang.org/dl/) 1.7
* github.com/pquerna/ffjson/ffjson
* github.com/xeipuuv/gojsonschema

### Configuration Options

- `udplogbeat.port` : The UDP port on which the process will listen (Default = 5000)
- `udplogbeat.max_message_size` : The maximum accepted message size (Default = 1024)
- `udplogbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false)
- `udplogbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false)
- `udplogbeat.publish_failed_json_invalid` : Boolean value indicating if JSON objects should be sent serialized in the event of a failed validation. This will add the `_udplogbeat_jspf` tag. (Default = false)
- `udplogbeat.json_document_type_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value.

### Configuration Example

Sample configuration for a syslog replacement
```
udplogbeat:
port: 5000
max_message_size: 4096
enable_syslog_format_only: false
```

Sample configuration that enforces schemas for JSON format events
```
udplogbeat:
port: 5001
max_message_size: 2048
enable_json_validation: true
json_document_type_schema:
email_contact: "/etc/udplogbeat/app1_schema.json"
stock_item: "/etc/udplogbeat/app2_schema.json"
```

JSON schemas can be automatically generated from an object here: http://jsonschema.net/. You can also view the included sample schemas `app1_schema.json` and `app2_schema.json` as examples.

#### Considerations

If you intend on using this as a drop-in replacement to logging with Rsyslog, this method will not persist your data to a file on disk.
If udplogbeat is down for any given reason, messages sent to the configured UDP port will never be processed or sent to your ELK cluster.
If you need 100% guarantee each message will be delivered at least once, this may not be the best solution for you.
If some potential loss of log events is acceptable for you, than this may be a reasonable solution for you.


### Log Structure

In order for the udplogbeat application to accept events, they must be structured in the following format:
In order for the udplogbeat application to accept events, when not in syslog format only mode (*enable_syslog_format_only: false*), they must be structured in the following format:

**[FORMAT]:[ES_TYPE]:[EVENT_DATA]**

Expand Down Expand Up @@ -71,6 +112,11 @@ in the same directory with the name udplogbeat.
make
```

Or to build the zipped binaries for OSX, Windows and Linux:

./build_os_binaries.sh "[VERSION_NUMBER]"

These will be placed in the `bin/` directory.

### Run

Expand Down
21 changes: 21 additions & 0 deletions app1_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"email": {
"type": "string"
},
"name": {
"type": "object",
"properties": {
"first": {
"type": "string"
},
"last": {
"type": "string"
}
}
}
},
"additionalProperties": false
}
24 changes: 24 additions & 0 deletions app2_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"item": {
"type": "object",
"properties": {
"code": {
"type": "integer"
},
"name": {
"type": "string"
}
}
},
"price": {
"type": "integer"
},
"stock_count": {
"type": "integer"
}
},
"additionalProperties": false
}
84 changes: 68 additions & 16 deletions beater/udplogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"os"
"os/signal"
"strings"
"time"

"github.com/elastic/beats/libbeat/beat"
Expand All @@ -15,12 +16,14 @@ import (
"github.com/hartfordfive/udplogbeat/config"
"github.com/hartfordfive/udplogbeat/udploglib"
"github.com/pquerna/ffjson/ffjson"
"github.com/xeipuuv/gojsonschema"
)

type Udplogbeat struct {
done chan struct{}
config config.Config
client publisher.Client
done chan struct{}
config config.Config
client publisher.Client
jsonDocumentSchema map[string]gojsonschema.JSONLoader
}

// Creates beater
Expand All @@ -35,6 +38,19 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config: config,
}

if bt.config.EnableJsonValidation {

bt.jsonDocumentSchema = map[string]gojsonschema.JSONLoader{}

for name, path := range config.JsonDocumentTypeSchema {
logp.Info("Loading JSON schema %s from %s", name, path)
schemaLoader := gojsonschema.NewReferenceLoader("file://" + path)
ds := schemaLoader
bt.jsonDocumentSchema[name] = ds
}

}

bt.config.Addr = fmt.Sprintf("127.0.0.1:%d", bt.config.Port)

go func() {
Expand Down Expand Up @@ -64,6 +80,8 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error {
}
udpBuf := make([]byte, bt.config.MaxMessageSize)
var event common.MapStr
var now common.Time
var logFormat, logType, logData string

for {

Expand All @@ -73,7 +91,7 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error {
default:
}

logp.Info("Reading from UDP socket...")
now = common.Time(time.Now())

// Events should be in the format of: [FORMAT]:[ES_TYPE]:[EVENT_DATA]
logSize, _, err := l.ReadFrom(udpBuf)
Expand All @@ -86,35 +104,69 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error {
}
}

logFormat, logType, logData, err := udploglib.GetLogItem(udpBuf[:logSize])
if err != nil {
logp.Err("Error parsing log item: %v", err)
continue
if bt.config.EnableSyslogFormatOnly {
logFormat = "plain"
logType = "syslog"
logData = strings.TrimSpace(string(udpBuf[:logSize]))
if logData == "" {
logp.Err("Syslog event is empty")
continue
}
} else {
parts, err := udploglib.GetLogItem(udpBuf[:logSize])
logFormat = parts[0]
logType = parts[1]
logData = parts[2]
if err != nil {
logp.Err("Error parsing log item: %v", err)
continue
}
logp.Info("Size, Format, ES Type: %d bytes, %s, %s", logSize, logFormat, logType)
}

logp.Info("Total log item bytes: %d", logSize)
logp.Info("Format: %s", logFormat)
logp.Info("ES Type: %s", logType)
logp.Info("Data: %s", logData)
//logp.Info("Data: %s...", logData[0:40])

event = common.MapStr{}

if logFormat == "json" {

if bt.config.EnableJsonValidation {

if _, ok := bt.jsonDocumentSchema[logType]; !ok {
logp.Err("No schema found for this type")
continue
}

result, err := gojsonschema.Validate(bt.jsonDocumentSchema[logType], gojsonschema.NewStringLoader(logData))
if err != nil {
logp.Err("Error with JSON object: %s", err.Error())
continue
}

if !result.Valid() {
logp.Err("Invalid document type")
event["message"] = logData
event["tags"] = []string{"_udplogbeat_jspf"}
goto SendFailedMsg
}
}

if err := ffjson.Unmarshal([]byte(logData), &event); err != nil {
logp.Err("Could not load json formated event: %v", err)
continue
event["message"] = logData
event["tags"] = []string{"_udplogbeat_jspf"}
}
logp.Info("Event: %v", event)
} else {
event["message"] = logData
}

event["@timestamp"] = common.Time(time.Now())
SendFailedMsg:
event["@timestamp"] = now
event["type"] = logType
event["counter"] = counter

bt.client.PublishEvent(event)
logp.Info("Event sent")
//logp.Info("Event sent")
counter++

}
Expand Down
8 changes: 8 additions & 0 deletions build_os_binaries.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

GOOS=linux GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-linux-amd64
zip bin/udplogbeat-$1-linux-amd64.zip bin/udplogbeat-$1-linux-amd64
GOOS=darwin GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-darwin-amd64
zip bin/udplogbeat-$1-darwin-amd64.zip bin/udplogbeat-$1-darwin-amd64
GOOS=windows GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-windows-amd64
zip bin/udplogbeat-$1-windows-amd64.zip bin/udplogbeat-$1-windows-amd64
23 changes: 16 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@ package config
import "time"

type Config struct {
Period time.Duration `config:"period"`
Port int `config:"port"`
MaxMessageSize int `config:"max_message_size"`
Addr string
Period time.Duration `config:"period"`
Port int `config:"port"`
MaxMessageSize int `config:"max_message_size"`
Addr string
EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"`
EnableJsonValidation bool `config:"enable_json_validation"`
PublishFailedJsonSchemaValidation bool `config:publish_failed_json_schema_validation`
PublishFailedJsonInvalid bool `config:publish_failed_json_invalid`
JsonDocumentTypeSchema map[string]string `config:"json_document_type_schema"`
}

var DefaultConfig = Config{
Period: 1 * time.Second,
Port: 5000,
MaxMessageSize: 1024,
Period: 1 * time.Second,
Port: 5000,
MaxMessageSize: 1024,
EnableJsonValidation: false,
PublishFailedJsonSchemaValidation: false,
PublishFailedJsonInvalid: false,
EnableSyslogFormatOnly: false,
}
2 changes: 1 addition & 1 deletion udplogbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"type": "string"
},
"match_mapping_type": "string",
"path_match": "fields.*"
"match": "*"
}
}
],
Expand Down
2 changes: 1 addition & 1 deletion udplogbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"type": "keyword"
},
"match_mapping_type": "string",
"path_match": "fields.*"
"match": "*"
}
}
],
Expand Down
14 changes: 8 additions & 6 deletions udploglib/utils.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
package udploglib

import (
//"errors"
"fmt"
"strings"
//"github.com/xeipuuv/gojsonschema"
)

// GetLogItem returns the log entry format, elasticsearch type, message and error (if any)
func GetLogItem(buf []byte) (string, string, string, error) {
func GetLogItem(buf []byte) ([]string, error) {

parts := strings.SplitN(string(buf), ":", 3)
if len(parts) != 3 {
return "", "", "", fmt.Errorf("Invalid log item")
return []string{"", "", ""}, fmt.Errorf("Invalid log item")
}
if parts[0] != "json" && parts[0] != "plain" {
return "", "", "", fmt.Errorf("Log format %s is invalid", parts[0])
return []string{"", "", ""}, fmt.Errorf("Log format %s is invalid", parts[0])
}
if parts[1] == "" {
return "", "", "", fmt.Errorf("A log type must be specified")
return []string{"", "", ""}, fmt.Errorf("A log type must be specified")
}
if parts[2] == "" {
return "", "", "", fmt.Errorf("Log data is empty")
return []string{"", "", ""}, fmt.Errorf("Log data is empty")
}

return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), strings.TrimSpace(parts[2]), nil
return []string{strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), strings.TrimSpace(parts[2])}, nil
}

0 comments on commit 1ad50c0

Please sign in to comment.