Skip to content

Release 1.3.1 (#364) #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
13 changes: 12 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ jobs:
- name: Nozzle Log
run: |
cf logs splunk-firehose-nozzle &

- name: Get tile name
run: |
echo "tile_name=$(ls tile/product | grep ".pivotal")" >> "$GITHUB_ENV"

- name: Upload tile
uses: actions/upload-artifact@v2
with:
name: ${{ env.tile_name }}
path: tile/product/${{ env.tile_name }}


# Skip test for now!
execute_tests:
Expand Down Expand Up @@ -258,4 +269,4 @@ jobs:
echo "Teardown deployment env"
cf delete splunk-firehose-nozzle -f
cf delete data_gen -f
cf delete-org splunk-ci-org -f
cf delete-org splunk-ci-org -f
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ This is recommended for dev environments only.
* `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled).
* `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled
* `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below
* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false)
* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s)

__About app cache params:__

Expand Down Expand Up @@ -276,7 +278,7 @@ FORMAT = new_index
<p class="note"><strong>Note:</strong>Moving from version 1.2.4 to 1.2.5, timestamp will use nanosecond precision instead of milliseconds.</p>


__Monitoring(Metric data Ingestion):__
## __Monitoring(Metric data Ingestion):__

| Metric Name | Description
|---|---
Expand All @@ -301,6 +303,15 @@ __Monitoring(Metric data Ingestion):__

<p class="note"><strong>Note:</strong>Select value Rate(Avg) for Aggregation from Analysis tab on the top right.</p>

### Routing data through edge processor via HEC
Logs can be routed to Splunk via Edge Processor. Assuming that you have a working Edge Processor instance, you can use it with minimal
changes to nozzle configuration.

Configuratino fields that you should change are:
* `SPLUNK_HOST`: Use the host of your Edge Processor instance instead of Splunk. Example: https://x.x.x.x:8088.
* `SPLUNK_TOKEN`: It is a required parameter. A token used to authorize your request, can be found in Edge Processor settings. If your
EP token authentication is turned off, you can enter a placeholder values instead (e.x. "-").

## <a id='walkthrough'></a> Troubleshooting
This topic describes how to troubleshoot Splunk Firehose Nozzle for Cloud Foundry.

Expand Down Expand Up @@ -508,7 +519,7 @@ $ chmod +x tools/nozzle.sh
Build project:

```
$ make VERSION=1.3.0
$ make VERSION=1.3.1
```

Run tests with [Ginkgo](http://onsi.github.io/ginkgo/)
Expand Down
43 changes: 24 additions & 19 deletions cache/cache_easyjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cache

import (
json "encoding/json"

jlexer "github.com/mailru/easyjson/jlexer"
jwriter "github.com/mailru/easyjson/jwriter"
)
Expand Down Expand Up @@ -43,25 +44,7 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
case "OrgGuid":
out.OrgGuid = string(in.String())
case "CfAppEnv":
if in.IsNull() {
in.Skip()
} else {
in.Delim('{')
if !in.IsDelim('}') {
out.CfAppEnv = make(map[string]interface{})
} else {
out.CfAppEnv = nil
}
for !in.IsDelim('}') {
key := string(in.String())
in.WantColon()
var v1 interface{}
v1 = in.Interface()
(out.CfAppEnv)[key] = v1
in.WantComma()
}
in.Delim('}')
}
parseCfAppEnv(in, out)
case "IgnoredApp":
out.IgnoredApp = bool(in.Bool())
default:
Expand All @@ -71,6 +54,28 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
}
in.Delim('}')
}

func parseCfAppEnv(in *jlexer.Lexer, out *App) {
if in.IsNull() {
in.Skip()
} else {
in.Delim('{')
if !in.IsDelim('}') {
out.CfAppEnv = make(map[string]interface{})
} else {
out.CfAppEnv = nil
}
for !in.IsDelim('}') {
key := string(in.String())
in.WantColon()
v1 := in.Interface()
(out.CfAppEnv)[key] = v1
in.WantComma()
}
in.Delim('}')
}
}

func easyjsonA591d1bcEncodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCache(out *jwriter.Writer, in App) {
out.RawByte('{')
first := true
Expand Down
1 change: 0 additions & 1 deletion eventrouter/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error {
// Ignore this event since we are not interested
return nil
}

_ = r.sink.Write(msg)

return nil
Expand Down
92 changes: 49 additions & 43 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,57 +195,63 @@ func ContainerMetric(msg *events.Envelope) *Event {
}

func (e *Event) AnnotateWithAppData(appCache cache.Cache, config *Config) {
cf_app_id := e.Fields["cf_app_id"]
appGuid := fmt.Sprintf("%s", cf_app_id)

if cf_app_id != nil && appGuid != "<nil>" && cf_app_id != "" {
appInfo, err := appCache.GetApp(appGuid)
if err != nil {
if err == cache.ErrMissingAndIgnored {
logrus.Info(err.Error(), cf_app_id)
} else {
logrus.Error("Failed to fetch application metadata from remote: ", err)
}
return
} else if appInfo == nil {
return
}
cf_app_name := appInfo.Name
cf_space_id := appInfo.SpaceGuid
cf_space_name := appInfo.SpaceName
cf_org_id := appInfo.OrgGuid
cf_org_name := appInfo.OrgName
cf_ignored_app := appInfo.IgnoredApp
app_env := appInfo.CfAppEnv

if cf_app_name != "" && config.AddAppName {
e.Fields["cf_app_name"] = cf_app_name
}
cfAppId := e.Fields["cf_app_id"]
appGuid := fmt.Sprintf("%s", cfAppId)

if cf_space_id != "" && config.AddSpaceGuid {
e.Fields["cf_space_id"] = cf_space_id
}
if cfAppId == nil || cfAppId == "" || appGuid == "<nil>" {
return
}

if cf_space_name != "" && config.AddSpaceName {
e.Fields["cf_space_name"] = cf_space_name
appInfo, err := appCache.GetApp(appGuid)
if err != nil {
if err == cache.ErrMissingAndIgnored {
logrus.Info(err.Error(), cfAppId)
} else {
logrus.Error("Failed to fetch application metadata from remote: ", err)
}
return
} else if appInfo == nil {
return
}

if cf_org_id != "" && config.AddOrgGuid {
e.Fields["cf_org_id"] = cf_org_id
}
e.parseAndAnnotateWithAppInfo(appInfo, config)
}

if cf_org_name != "" && config.AddOrgName {
e.Fields["cf_org_name"] = cf_org_name
}
func (e *Event) parseAndAnnotateWithAppInfo(appInfo *cache.App, config *Config) {
cfAppName := appInfo.Name
cfSpaceId := appInfo.SpaceGuid
cfSpaceName := appInfo.SpaceName
cfOrgId := appInfo.OrgGuid
cfOrgName := appInfo.OrgName
cfIgnoredApp := appInfo.IgnoredApp
appEnv := appInfo.CfAppEnv

if cfAppName != "" && config.AddAppName {
e.Fields["cf_app_name"] = cfAppName
}

if app_env["SPLUNK_INDEX"] != nil {
e.Fields["info_splunk_index"] = app_env["SPLUNK_INDEX"]
}
if cfSpaceId != "" && config.AddSpaceGuid {
e.Fields["cf_space_id"] = cfSpaceId
}

if cf_ignored_app != false {
e.Fields["cf_ignored_app"] = cf_ignored_app
}
if cfSpaceName != "" && config.AddSpaceName {
e.Fields["cf_space_name"] = cfSpaceName
}

if cfOrgId != "" && config.AddOrgGuid {
e.Fields["cf_org_id"] = cfOrgId
}

if cfOrgName != "" && config.AddOrgName {
e.Fields["cf_org_name"] = cfOrgName
}

if appEnv["SPLUNK_INDEX"] != nil {
e.Fields["info_splunk_index"] = appEnv["SPLUNK_INDEX"]
}

if cfIgnoredApp {
e.Fields["cf_ignored_app"] = cfIgnoredApp
}
}

Expand Down
26 changes: 14 additions & 12 deletions eventsink/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"

type SplunkConfig struct {
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration
}

type ParseConfig = fevents.Config
Expand Down
61 changes: 38 additions & 23 deletions eventwriter/splunk_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
"fmt"
"io"
"net/http"
"time"

"code.cloudfoundry.org/cfhttp"
"code.cloudfoundry.org/lager"
"github.com/cloudfoundry-community/splunk-firehose-nozzle/utils"
)

var keepAliveTimer = time.Now()

type SplunkConfig struct {
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration

Logger lager.Logger
}
Expand Down Expand Up @@ -52,18 +57,7 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
bodyBuffer := new(bytes.Buffer)
count := uint64(len(events))
for i, event := range events {

if _, ok := event["index"]; !ok {
if event["event"].(map[string]interface{})["info_splunk_index"] != nil {
event["index"] = event["event"].(map[string]interface{})["info_splunk_index"]
} else if s.config.Index != "" {
event["index"] = s.config.Index
}
}

if len(s.config.Fields) > 0 {
event["fields"] = s.config.Fields
}
s.parseEvent(&event)

eventJson, err := json.Marshal(event)
if err == nil {
Expand All @@ -90,6 +84,22 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
}
}

func (s *SplunkEvent) parseEvent(event *map[string]interface{}) error {
if _, ok := (*event)["index"]; !ok {
if (*event)["event"].(map[string]interface{})["info_splunk_index"] != nil {
(*event)["index"] = (*event)["event"].(map[string]interface{})["info_splunk_index"]
} else if s.config.Index != "" {
(*event)["index"] = s.config.Index
}
}

if len(s.config.Fields) > 0 {
(*event)["fields"] = s.config.Fields
}

return nil
}

func (s *SplunkEvent) send(postBody *[]byte) error {
endpoint := fmt.Sprintf("%s/services/collector", s.config.Host)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(*postBody))
Expand All @@ -113,10 +123,15 @@ func (s *SplunkEvent) send(postBody *[]byte) error {
responseBody, _ := io.ReadAll(resp.Body)
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
} else {
//Draining the response buffer, so that the same connection can be reused the next time
_, err := io.Copy(io.Discard, resp.Body)
if err != nil {
s.config.Logger.Error("Error discarding response body", err)
if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) {
if s.config.KeepAliveTimer > 0 {
keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer)
}
} else {
//Draining the response buffer, so that the same connection can be reused the next time
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
s.config.Logger.Error("Error discarding response body", err)
}
}
}
s.BodyBufferSize.Add(uint64(len(*postBody)))
Expand Down
Loading