Skip to content

Commit 767d1f6

Browse files
sbylica-splunkVihasMakwanaVihas Splunkmetskem
authored
Release 1.3.1 (#364)
* Refactoring sonarqube issues (#355) * Refactoring various functions to lower complexity * Set "keepalive timeout", closing the splunk connection every 5 second - with configuration (#354) * Keepalive timeout config (#361) * Updated dependencies (#363) * Version bump (#365) * Edge processor configuration * Upgrade to golang 1.20 and upgrade compatible dependencies --------- Co-authored-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Co-authored-by: Vihas Splunk <vhiteshbhai@splunk.com> Co-authored-by: Harry Metske <harry.metske@gmail.com>
1 parent 0d2ff2b commit 767d1f6

File tree

563 files changed

+18542
-11609
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

563 files changed

+18542
-11609
lines changed

.github/workflows/main.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,17 @@ jobs:
183183
- name: Nozzle Log
184184
run: |
185185
cf logs splunk-firehose-nozzle &
186+
187+
- name: Get tile name
188+
run: |
189+
echo "tile_name=$(ls tile/product | grep ".pivotal")" >> "$GITHUB_ENV"
190+
191+
- name: Upload tile
192+
uses: actions/upload-artifact@v2
193+
with:
194+
name: ${{ env.tile_name }}
195+
path: tile/product/${{ env.tile_name }}
196+
186197

187198
# Skip test for now!
188199
execute_tests:
@@ -258,4 +269,4 @@ jobs:
258269
echo "Teardown deployment env"
259270
cf delete splunk-firehose-nozzle -f
260271
cf delete data_gen -f
261-
cf delete-org splunk-ci-org -f
272+
cf delete-org splunk-ci-org -f

README.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ This is recommended for dev environments only.
111111
* `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled).
112112
* `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled
113113
* `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below
114+
* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false)
115+
* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s)
114116

115117
__About app cache params:__
116118

@@ -276,7 +278,7 @@ FORMAT = new_index
276278
<p class="note"><strong>Note:</strong>Moving from version 1.2.4 to 1.2.5, timestamp will use nanosecond precision instead of milliseconds.</p>
277279
278280
279-
__Monitoring(Metric data Ingestion):__
281+
## __Monitoring(Metric data Ingestion):__
280282
281283
| Metric Name | Description
282284
|---|---
@@ -301,6 +303,15 @@ __Monitoring(Metric data Ingestion):__
301303
302304
<p class="note"><strong>Note:</strong>Select value Rate(Avg) for Aggregation from Analysis tab on the top right.</p>
303305
306+
### Routing data through edge processor via HEC
307+
Logs can be routed to Splunk via Edge Processor. Assuming that you have a working Edge Processor instance, you can use it with minimal
308+
changes to nozzle configuration.
309+
310+
Configuratino fields that you should change are:
311+
* `SPLUNK_HOST`: Use the host of your Edge Processor instance instead of Splunk. Example: https://x.x.x.x:8088.
312+
* `SPLUNK_TOKEN`: It is a required parameter. A token used to authorize your request, can be found in Edge Processor settings. If your
313+
EP token authentication is turned off, you can enter a placeholder values instead (e.x. "-").
314+
304315
## <a id='walkthrough'></a> Troubleshooting
305316
This topic describes how to troubleshoot Splunk Firehose Nozzle for Cloud Foundry.
306317
@@ -508,7 +519,7 @@ $ chmod +x tools/nozzle.sh
508519
Build project:
509520
510521
```
511-
$ make VERSION=1.3.0
522+
$ make VERSION=1.3.1
512523
```
513524
514525
Run tests with [Ginkgo](http://onsi.github.io/ginkgo/)

cache/cache_easyjson.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package cache
44

55
import (
66
json "encoding/json"
7+
78
jlexer "github.com/mailru/easyjson/jlexer"
89
jwriter "github.com/mailru/easyjson/jwriter"
910
)
@@ -43,25 +44,7 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
4344
case "OrgGuid":
4445
out.OrgGuid = string(in.String())
4546
case "CfAppEnv":
46-
if in.IsNull() {
47-
in.Skip()
48-
} else {
49-
in.Delim('{')
50-
if !in.IsDelim('}') {
51-
out.CfAppEnv = make(map[string]interface{})
52-
} else {
53-
out.CfAppEnv = nil
54-
}
55-
for !in.IsDelim('}') {
56-
key := string(in.String())
57-
in.WantColon()
58-
var v1 interface{}
59-
v1 = in.Interface()
60-
(out.CfAppEnv)[key] = v1
61-
in.WantComma()
62-
}
63-
in.Delim('}')
64-
}
47+
parseCfAppEnv(in, out)
6548
case "IgnoredApp":
6649
out.IgnoredApp = bool(in.Bool())
6750
default:
@@ -71,6 +54,28 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
7154
}
7255
in.Delim('}')
7356
}
57+
58+
func parseCfAppEnv(in *jlexer.Lexer, out *App) {
59+
if in.IsNull() {
60+
in.Skip()
61+
} else {
62+
in.Delim('{')
63+
if !in.IsDelim('}') {
64+
out.CfAppEnv = make(map[string]interface{})
65+
} else {
66+
out.CfAppEnv = nil
67+
}
68+
for !in.IsDelim('}') {
69+
key := string(in.String())
70+
in.WantColon()
71+
v1 := in.Interface()
72+
(out.CfAppEnv)[key] = v1
73+
in.WantComma()
74+
}
75+
in.Delim('}')
76+
}
77+
}
78+
7479
func easyjsonA591d1bcEncodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCache(out *jwriter.Writer, in App) {
7580
out.RawByte('{')
7681
first := true

eventrouter/default.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error {
3838
// Ignore this event since we are not interested
3939
return nil
4040
}
41-
4241
_ = r.sink.Write(msg)
4342

4443
return nil

events/events.go

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -195,57 +195,63 @@ func ContainerMetric(msg *events.Envelope) *Event {
195195
}
196196

197197
func (e *Event) AnnotateWithAppData(appCache cache.Cache, config *Config) {
198-
cf_app_id := e.Fields["cf_app_id"]
199-
appGuid := fmt.Sprintf("%s", cf_app_id)
200-
201-
if cf_app_id != nil && appGuid != "<nil>" && cf_app_id != "" {
202-
appInfo, err := appCache.GetApp(appGuid)
203-
if err != nil {
204-
if err == cache.ErrMissingAndIgnored {
205-
logrus.Info(err.Error(), cf_app_id)
206-
} else {
207-
logrus.Error("Failed to fetch application metadata from remote: ", err)
208-
}
209-
return
210-
} else if appInfo == nil {
211-
return
212-
}
213-
cf_app_name := appInfo.Name
214-
cf_space_id := appInfo.SpaceGuid
215-
cf_space_name := appInfo.SpaceName
216-
cf_org_id := appInfo.OrgGuid
217-
cf_org_name := appInfo.OrgName
218-
cf_ignored_app := appInfo.IgnoredApp
219-
app_env := appInfo.CfAppEnv
220-
221-
if cf_app_name != "" && config.AddAppName {
222-
e.Fields["cf_app_name"] = cf_app_name
223-
}
198+
cfAppId := e.Fields["cf_app_id"]
199+
appGuid := fmt.Sprintf("%s", cfAppId)
224200

225-
if cf_space_id != "" && config.AddSpaceGuid {
226-
e.Fields["cf_space_id"] = cf_space_id
227-
}
201+
if cfAppId == nil || cfAppId == "" || appGuid == "<nil>" {
202+
return
203+
}
228204

229-
if cf_space_name != "" && config.AddSpaceName {
230-
e.Fields["cf_space_name"] = cf_space_name
205+
appInfo, err := appCache.GetApp(appGuid)
206+
if err != nil {
207+
if err == cache.ErrMissingAndIgnored {
208+
logrus.Info(err.Error(), cfAppId)
209+
} else {
210+
logrus.Error("Failed to fetch application metadata from remote: ", err)
231211
}
212+
return
213+
} else if appInfo == nil {
214+
return
215+
}
232216

233-
if cf_org_id != "" && config.AddOrgGuid {
234-
e.Fields["cf_org_id"] = cf_org_id
235-
}
217+
e.parseAndAnnotateWithAppInfo(appInfo, config)
218+
}
236219

237-
if cf_org_name != "" && config.AddOrgName {
238-
e.Fields["cf_org_name"] = cf_org_name
239-
}
220+
func (e *Event) parseAndAnnotateWithAppInfo(appInfo *cache.App, config *Config) {
221+
cfAppName := appInfo.Name
222+
cfSpaceId := appInfo.SpaceGuid
223+
cfSpaceName := appInfo.SpaceName
224+
cfOrgId := appInfo.OrgGuid
225+
cfOrgName := appInfo.OrgName
226+
cfIgnoredApp := appInfo.IgnoredApp
227+
appEnv := appInfo.CfAppEnv
228+
229+
if cfAppName != "" && config.AddAppName {
230+
e.Fields["cf_app_name"] = cfAppName
231+
}
240232

241-
if app_env["SPLUNK_INDEX"] != nil {
242-
e.Fields["info_splunk_index"] = app_env["SPLUNK_INDEX"]
243-
}
233+
if cfSpaceId != "" && config.AddSpaceGuid {
234+
e.Fields["cf_space_id"] = cfSpaceId
235+
}
244236

245-
if cf_ignored_app != false {
246-
e.Fields["cf_ignored_app"] = cf_ignored_app
247-
}
237+
if cfSpaceName != "" && config.AddSpaceName {
238+
e.Fields["cf_space_name"] = cfSpaceName
239+
}
240+
241+
if cfOrgId != "" && config.AddOrgGuid {
242+
e.Fields["cf_org_id"] = cfOrgId
243+
}
244+
245+
if cfOrgName != "" && config.AddOrgName {
246+
e.Fields["cf_org_name"] = cfOrgName
247+
}
248+
249+
if appEnv["SPLUNK_INDEX"] != nil {
250+
e.Fields["info_splunk_index"] = appEnv["SPLUNK_INDEX"]
251+
}
248252

253+
if cfIgnoredApp {
254+
e.Fields["cf_ignored_app"] = cfIgnoredApp
249255
}
250256
}
251257

eventsink/splunk.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,20 @@ import (
2222
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"
2323

2424
type SplunkConfig struct {
25-
FlushInterval time.Duration
26-
QueueSize int // consumer queue buffer size
27-
BatchSize int
28-
Retries int // No of retries to post events to HEC before dropping events
29-
Hostname string
30-
SubscriptionID string
31-
ExtraFields map[string]string
32-
TraceLogging bool
33-
UUID string
34-
Logger lager.Logger
35-
StatusMonitorInterval time.Duration
36-
LoggingIndex string
25+
FlushInterval time.Duration
26+
QueueSize int // consumer queue buffer size
27+
BatchSize int
28+
Retries int // No of retries to post events to HEC before dropping events
29+
Hostname string
30+
SubscriptionID string
31+
ExtraFields map[string]string
32+
TraceLogging bool
33+
UUID string
34+
Logger lager.Logger
35+
StatusMonitorInterval time.Duration
36+
LoggingIndex string
37+
RefreshSplunkConnection bool
38+
KeepAliveTimer time.Duration
3739
}
3840

3941
type ParseConfig = fevents.Config

eventwriter/splunk_event.go

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,25 @@ import (
88
"fmt"
99
"io"
1010
"net/http"
11+
"time"
1112

1213
"code.cloudfoundry.org/cfhttp"
1314
"code.cloudfoundry.org/lager"
1415
"github.com/cloudfoundry-community/splunk-firehose-nozzle/utils"
1516
)
1617

18+
var keepAliveTimer = time.Now()
19+
1720
type SplunkConfig struct {
18-
Host string
19-
Token string
20-
Index string
21-
Fields map[string]string
22-
SkipSSL bool
23-
Debug bool
24-
Version string
21+
Host string
22+
Token string
23+
Index string
24+
Fields map[string]string
25+
SkipSSL bool
26+
Debug bool
27+
Version string
28+
RefreshSplunkConnection bool
29+
KeepAliveTimer time.Duration
2530

2631
Logger lager.Logger
2732
}
@@ -52,18 +57,7 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
5257
bodyBuffer := new(bytes.Buffer)
5358
count := uint64(len(events))
5459
for i, event := range events {
55-
56-
if _, ok := event["index"]; !ok {
57-
if event["event"].(map[string]interface{})["info_splunk_index"] != nil {
58-
event["index"] = event["event"].(map[string]interface{})["info_splunk_index"]
59-
} else if s.config.Index != "" {
60-
event["index"] = s.config.Index
61-
}
62-
}
63-
64-
if len(s.config.Fields) > 0 {
65-
event["fields"] = s.config.Fields
66-
}
60+
s.parseEvent(&event)
6761

6862
eventJson, err := json.Marshal(event)
6963
if err == nil {
@@ -90,6 +84,22 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
9084
}
9185
}
9286

87+
func (s *SplunkEvent) parseEvent(event *map[string]interface{}) error {
88+
if _, ok := (*event)["index"]; !ok {
89+
if (*event)["event"].(map[string]interface{})["info_splunk_index"] != nil {
90+
(*event)["index"] = (*event)["event"].(map[string]interface{})["info_splunk_index"]
91+
} else if s.config.Index != "" {
92+
(*event)["index"] = s.config.Index
93+
}
94+
}
95+
96+
if len(s.config.Fields) > 0 {
97+
(*event)["fields"] = s.config.Fields
98+
}
99+
100+
return nil
101+
}
102+
93103
func (s *SplunkEvent) send(postBody *[]byte) error {
94104
endpoint := fmt.Sprintf("%s/services/collector", s.config.Host)
95105
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(*postBody))
@@ -113,10 +123,15 @@ func (s *SplunkEvent) send(postBody *[]byte) error {
113123
responseBody, _ := io.ReadAll(resp.Body)
114124
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
115125
} else {
116-
//Draining the response buffer, so that the same connection can be reused the next time
117-
_, err := io.Copy(io.Discard, resp.Body)
118-
if err != nil {
119-
s.config.Logger.Error("Error discarding response body", err)
126+
if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) {
127+
if s.config.KeepAliveTimer > 0 {
128+
keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer)
129+
}
130+
} else {
131+
//Draining the response buffer, so that the same connection can be reused the next time
132+
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
133+
s.config.Logger.Error("Error discarding response body", err)
134+
}
120135
}
121136
}
122137
s.BodyBufferSize.Add(uint64(len(*postBody)))

0 commit comments

Comments
 (0)