From 1ad50c06f4e6421eca43a1bd4e4e7f10ba686458 Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Wed, 30 Nov 2016 06:40:58 -0500 Subject: [PATCH] Version 0.2.0 (#3) --- CHANGELOG.md | 10 +++++ README.md | 48 +++++++++++++++++++- app1_schema.json | 21 +++++++++ app2_schema.json | 24 ++++++++++ beater/udplogbeat.go | 84 ++++++++++++++++++++++++++++------- build_os_binaries.sh | 8 ++++ config/config.go | 23 +++++++--- udplogbeat.template-es2x.json | 2 +- udplogbeat.template.json | 2 +- udploglib/utils.go | 14 +++--- 10 files changed, 204 insertions(+), 32 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 app1_schema.json create mode 100644 app2_schema.json create mode 100755 build_os_binaries.sh diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..2c795db --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +## Changelog + +0.1.0 +----- +* First initial beta release. + +0.2.0 +----- +* Added configuration option `json_document_type_schema` which gives ability to optionally enable and enforce a JSON schema validation for JSON format messages. +* Added option `enable_syslog_format_only`, which gives the ability to run process in a mode that only accepts syslog type messages. This could be used as a logging replacement for processes that log to local syslog address. diff --git a/README.md b/README.md index a766fef..1b56c78 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,52 @@ Ensure that this folder is at the following location: ### Requirements * [Golang](https://golang.org/dl/) 1.7 +* github.com/pquerna/ffjson/ffjson +* github.com/xeipuuv/gojsonschema + +### Configuration Options + +- `udplogbeat.port` : The UDP port on which the process will listen (Default = 5000) +- `udplogbeat.max_message_size` : The maximum accepted message size (Default = 1024) +- `udplogbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false) +- `udplogbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false) +- `udplogbeat.publish_failed_json_invalid` : Boolean value indicating if JSON objects should be sent serialized in the event of a failed validation. This will add the `_udplogbeat_jspf` tag. (Default = false) +- `udplogbeat.json_document_type_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value. + +### Configuration Example + +Sample configuration for a syslog replacement +``` +udplogbeat: + port: 5000 + max_message_size: 4096 + enable_syslog_format_only: false +``` + +Sample configuration that enforces schemas for JSON format events +``` +udplogbeat: + port: 5001 + max_message_size: 2048 + enable_json_validation: true + json_document_type_schema: + email_contact: "/etc/udplogbeat/app1_schema.json" + stock_item: "/etc/udplogbeat/app2_schema.json" +``` + +JSON schemas can be automatically generated from an object here: http://jsonschema.net/. You can also view the included sample schemas `app1_schema.json` and `app2_schema.json` as examples. + +#### Considerations + +If you intend on using this as a drop-in replacement to logging with Rsyslog, this method will not persist your data to a file on disk. +If udplogbeat is down for any given reason, messages sent to the configured UDP port will never be processed or sent to your ELK cluster. +If you need 100% guarantee each message will be delivered at least once, this may not be the best solution for you. +If some potential loss of log events is acceptable for you, than this may be a reasonable solution for you. ### Log Structure -In order for the udplogbeat application to accept events, they must be structured in the following format: +In order for the udplogbeat application to accept events, when not in syslog format only mode (*enable_syslog_format_only: false*), they must be structured in the following format: **[FORMAT]:[ES_TYPE]:[EVENT_DATA]** @@ -71,6 +112,11 @@ in the same directory with the name udplogbeat. make ``` +Or to build the zipped binaries for OSX, Windows and Linux: + +./build_os_binaries.sh "[VERSION_NUMBER]" + +These will be placed in the `bin/` directory. ### Run diff --git a/app1_schema.json b/app1_schema.json new file mode 100644 index 0000000..11402f7 --- /dev/null +++ b/app1_schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "email": { + "type": "string" + }, + "name": { + "type": "object", + "properties": { + "first": { + "type": "string" + }, + "last": { + "type": "string" + } + } + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/app2_schema.json b/app2_schema.json new file mode 100644 index 0000000..e65870d --- /dev/null +++ b/app2_schema.json @@ -0,0 +1,24 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "item": { + "type": "object", + "properties": { + "code": { + "type": "integer" + }, + "name": { + "type": "string" + } + } + }, + "price": { + "type": "integer" + }, + "stock_count": { + "type": "integer" + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/beater/udplogbeat.go b/beater/udplogbeat.go index 7afc078..e4b6c40 100644 --- a/beater/udplogbeat.go +++ b/beater/udplogbeat.go @@ -5,6 +5,7 @@ import ( "net" "os" "os/signal" + "strings" "time" "github.com/elastic/beats/libbeat/beat" @@ -15,12 +16,14 @@ import ( "github.com/hartfordfive/udplogbeat/config" "github.com/hartfordfive/udplogbeat/udploglib" "github.com/pquerna/ffjson/ffjson" + "github.com/xeipuuv/gojsonschema" ) type Udplogbeat struct { - done chan struct{} - config config.Config - client publisher.Client + done chan struct{} + config config.Config + client publisher.Client + jsonDocumentSchema map[string]gojsonschema.JSONLoader } // Creates beater @@ -35,6 +38,19 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config: config, } + if bt.config.EnableJsonValidation { + + bt.jsonDocumentSchema = map[string]gojsonschema.JSONLoader{} + + for name, path := range config.JsonDocumentTypeSchema { + logp.Info("Loading JSON schema %s from %s", name, path) + schemaLoader := gojsonschema.NewReferenceLoader("file://" + path) + ds := schemaLoader + bt.jsonDocumentSchema[name] = ds + } + + } + bt.config.Addr = fmt.Sprintf("127.0.0.1:%d", bt.config.Port) go func() { @@ -64,6 +80,8 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error { } udpBuf := make([]byte, bt.config.MaxMessageSize) var event common.MapStr + var now common.Time + var logFormat, logType, logData string for { @@ -73,7 +91,7 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error { default: } - logp.Info("Reading from UDP socket...") + now = common.Time(time.Now()) // Events should be in the format of: [FORMAT]:[ES_TYPE]:[EVENT_DATA] logSize, _, err := l.ReadFrom(udpBuf) @@ -86,35 +104,69 @@ func (bt *Udplogbeat) Run(b *beat.Beat) error { } } - logFormat, logType, logData, err := udploglib.GetLogItem(udpBuf[:logSize]) - if err != nil { - logp.Err("Error parsing log item: %v", err) - continue + if bt.config.EnableSyslogFormatOnly { + logFormat = "plain" + logType = "syslog" + logData = strings.TrimSpace(string(udpBuf[:logSize])) + if logData == "" { + logp.Err("Syslog event is empty") + continue + } + } else { + parts, err := udploglib.GetLogItem(udpBuf[:logSize]) + logFormat = parts[0] + logType = parts[1] + logData = parts[2] + if err != nil { + logp.Err("Error parsing log item: %v", err) + continue + } + logp.Info("Size, Format, ES Type: %d bytes, %s, %s", logSize, logFormat, logType) } - logp.Info("Total log item bytes: %d", logSize) - logp.Info("Format: %s", logFormat) - logp.Info("ES Type: %s", logType) - logp.Info("Data: %s", logData) + //logp.Info("Data: %s...", logData[0:40]) event = common.MapStr{} if logFormat == "json" { + + if bt.config.EnableJsonValidation { + + if _, ok := bt.jsonDocumentSchema[logType]; !ok { + logp.Err("No schema found for this type") + continue + } + + result, err := gojsonschema.Validate(bt.jsonDocumentSchema[logType], gojsonschema.NewStringLoader(logData)) + if err != nil { + logp.Err("Error with JSON object: %s", err.Error()) + continue + } + + if !result.Valid() { + logp.Err("Invalid document type") + event["message"] = logData + event["tags"] = []string{"_udplogbeat_jspf"} + goto SendFailedMsg + } + } + if err := ffjson.Unmarshal([]byte(logData), &event); err != nil { logp.Err("Could not load json formated event: %v", err) - continue + event["message"] = logData + event["tags"] = []string{"_udplogbeat_jspf"} } - logp.Info("Event: %v", event) } else { event["message"] = logData } - event["@timestamp"] = common.Time(time.Now()) + SendFailedMsg: + event["@timestamp"] = now event["type"] = logType event["counter"] = counter bt.client.PublishEvent(event) - logp.Info("Event sent") + //logp.Info("Event sent") counter++ } diff --git a/build_os_binaries.sh b/build_os_binaries.sh new file mode 100755 index 0000000..4663ce2 --- /dev/null +++ b/build_os_binaries.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +GOOS=linux GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-linux-amd64 +zip bin/udplogbeat-$1-linux-amd64.zip bin/udplogbeat-$1-linux-amd64 +GOOS=darwin GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-darwin-amd64 +zip bin/udplogbeat-$1-darwin-amd64.zip bin/udplogbeat-$1-darwin-amd64 +GOOS=windows GOARCH=amd64 go build -ldflags "-s -w" -o bin/udplogbeat-$1-windows-amd64 +zip bin/udplogbeat-$1-windows-amd64.zip bin/udplogbeat-$1-windows-amd64 diff --git a/config/config.go b/config/config.go index 2943f0f..35dfd54 100644 --- a/config/config.go +++ b/config/config.go @@ -6,14 +6,23 @@ package config import "time" type Config struct { - Period time.Duration `config:"period"` - Port int `config:"port"` - MaxMessageSize int `config:"max_message_size"` - Addr string + Period time.Duration `config:"period"` + Port int `config:"port"` + MaxMessageSize int `config:"max_message_size"` + Addr string + EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"` + EnableJsonValidation bool `config:"enable_json_validation"` + PublishFailedJsonSchemaValidation bool `config:publish_failed_json_schema_validation` + PublishFailedJsonInvalid bool `config:publish_failed_json_invalid` + JsonDocumentTypeSchema map[string]string `config:"json_document_type_schema"` } var DefaultConfig = Config{ - Period: 1 * time.Second, - Port: 5000, - MaxMessageSize: 1024, + Period: 1 * time.Second, + Port: 5000, + MaxMessageSize: 1024, + EnableJsonValidation: false, + PublishFailedJsonSchemaValidation: false, + PublishFailedJsonInvalid: false, + EnableSyslogFormatOnly: false, } diff --git a/udplogbeat.template-es2x.json b/udplogbeat.template-es2x.json index 9456712..75b063f 100644 --- a/udplogbeat.template-es2x.json +++ b/udplogbeat.template-es2x.json @@ -18,7 +18,7 @@ "type": "string" }, "match_mapping_type": "string", - "path_match": "fields.*" + "match": "*" } } ], diff --git a/udplogbeat.template.json b/udplogbeat.template.json index 032ceb5..cfdc77b 100644 --- a/udplogbeat.template.json +++ b/udplogbeat.template.json @@ -15,7 +15,7 @@ "type": "keyword" }, "match_mapping_type": "string", - "path_match": "fields.*" + "match": "*" } } ], diff --git a/udploglib/utils.go b/udploglib/utils.go index eb85bc3..e624bea 100644 --- a/udploglib/utils.go +++ b/udploglib/utils.go @@ -1,26 +1,28 @@ package udploglib import ( + //"errors" "fmt" "strings" + //"github.com/xeipuuv/gojsonschema" ) // GetLogItem returns the log entry format, elasticsearch type, message and error (if any) -func GetLogItem(buf []byte) (string, string, string, error) { +func GetLogItem(buf []byte) ([]string, error) { parts := strings.SplitN(string(buf), ":", 3) if len(parts) != 3 { - return "", "", "", fmt.Errorf("Invalid log item") + return []string{"", "", ""}, fmt.Errorf("Invalid log item") } if parts[0] != "json" && parts[0] != "plain" { - return "", "", "", fmt.Errorf("Log format %s is invalid", parts[0]) + return []string{"", "", ""}, fmt.Errorf("Log format %s is invalid", parts[0]) } if parts[1] == "" { - return "", "", "", fmt.Errorf("A log type must be specified") + return []string{"", "", ""}, fmt.Errorf("A log type must be specified") } if parts[2] == "" { - return "", "", "", fmt.Errorf("Log data is empty") + return []string{"", "", ""}, fmt.Errorf("Log data is empty") } - return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), strings.TrimSpace(parts[2]), nil + return []string{strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), strings.TrimSpace(parts[2])}, nil }