Skip to content

Commit 6027135

Browse files
[POA-2609] Add k8s and CRI sub-modules and function interfaces (#66)
Child PRs: #67, #69, #71, #72, #73, #74, #75, #76, #77, #78, #79, #80, #82
1 parent 662b4e3 commit 6027135

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2204
-147
lines changed

apidump/apidump.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package apidump
22

33
import (
44
"context"
5+
"net/http"
56
"os"
67
"os/exec"
78
"os/signal"
@@ -70,6 +71,14 @@ const (
7071
notMatchedFilter filterState = "UNMATCHED"
7172
)
7273

74+
// Args for running apidump as daemonset in Kubernetes
75+
type DaemonsetArgs struct {
76+
TargetNetworkNamespaceOpt string
77+
StopChan <-chan error `json:"-"`
78+
APIKey string
79+
Environment string
80+
}
81+
7382
type Args struct {
7483
// Required args
7584
ClientID akid.ClientID
@@ -138,6 +147,8 @@ type Args struct {
138147

139148
// Whether to enable repro mode and include request/response payloads when uploading witnesses.
140149
ReproMode bool
150+
151+
DaemonsetArgs optionals.Optional[DaemonsetArgs]
141152
}
142153

143154
// TODO: either remove write-to-local-HAR-file completely,
@@ -177,7 +188,14 @@ func (a *apidump) LookupService() error {
177188
if !a.TargetIsRemote() {
178189
return nil
179190
}
180-
frontClient := rest.NewFrontClient(a.Domain, a.ClientID)
191+
192+
// Set auth handler for processes starting via daemonset
193+
var authHandler func(*http.Request) error
194+
if daemonsetArgs, exists := a.DaemonsetArgs.Get(); exists {
195+
authHandler = rest.ApiDumpDaemonsetAuthHandler(daemonsetArgs.APIKey, daemonsetArgs.Environment)
196+
}
197+
198+
frontClient := rest.NewFrontClient(a.Domain, a.ClientID, authHandler)
181199

182200
if a.PostmanCollectionID != "" {
183201
backendSvc, err := util.GetOrCreateServiceIDByPostmanCollectionID(frontClient, a.PostmanCollectionID)
@@ -197,7 +215,7 @@ func (a *apidump) LookupService() error {
197215
a.backendSvcName = serviceName
198216
}
199217

200-
a.learnClient = rest.NewLearnClient(a.Domain, a.ClientID, a.backendSvc)
218+
a.learnClient = rest.NewLearnClient(a.Domain, a.ClientID, a.backendSvc, authHandler)
201219
return nil
202220
}
203221

@@ -398,7 +416,7 @@ func (a *apidump) RotateLearnSession(done <-chan struct{}, collectors []trace.Le
398416

399417
case <-t.C:
400418
traceName := util.RandomLearnSessionName()
401-
backendLrn, err := util.NewLearnSession(args.Domain, args.ClientID, a.backendSvc, traceName, traceTags, nil)
419+
backendLrn, err := util.NewLearnSession(a.learnClient, traceName, traceTags, nil)
402420
if err != nil {
403421
telemetry.Error("new learn session", err)
404422
printer.Errorf("Failed to create trace %s: %v\n", traceName, err)
@@ -494,8 +512,17 @@ func (a *apidump) Run() error {
494512
printer.Debugln("Capturing filtered traffic for debugging.")
495513
}
496514

515+
var (
516+
targetNetworkNamespace optionals.Optional[string]
517+
daemonSetProcessStopChan <-chan error
518+
)
519+
if daemonsetArgs, exists := a.DaemonsetArgs.Get(); exists {
520+
targetNetworkNamespace = optionals.Some(daemonsetArgs.TargetNetworkNamespaceOpt)
521+
daemonSetProcessStopChan = daemonsetArgs.StopChan
522+
}
523+
497524
// Get the interfaces to listen on.
498-
interfaces, err := getEligibleInterfaces(args.Interfaces)
525+
interfaces, err := getEligibleInterfaces(args.Interfaces, targetNetworkNamespace)
499526
if err != nil {
500527
a.SendErrorTelemetry(GetErrorTypeWithDefault(err, api_schema.ApidumpError_PCAPInterfaceOther), err)
501528
return errors.Wrap(err, "No network interfaces could be used")
@@ -578,7 +605,7 @@ func (a *apidump) Run() error {
578605
var backendLrn akid.LearnSessionID
579606
if a.TargetIsRemote() {
580607
uri := a.Out.AkitaURI
581-
backendLrn, err = util.NewLearnSession(args.Domain, args.ClientID, a.backendSvc, uri.ObjectName, traceTags, nil)
608+
backendLrn, err = util.NewLearnSession(a.learnClient, uri.ObjectName, traceTags, nil)
582609
if err == nil {
583610
printer.Infof("Created new trace on Postman Cloud: %s\n", uri)
584611
} else {
@@ -767,7 +794,7 @@ func (a *apidump) Run() error {
767794
go func(interfaceName, filter string) {
768795
defer doneWG.Done()
769796
// Collect trace. This blocks until stop is closed or an error occurs.
770-
if err := pcap.Collect(stop, interfaceName, filter, bufferShare, args.ParseTLSHandshakes, collector, summary, pool); err != nil {
797+
if err := pcap.Collect(stop, interfaceName, filter, targetNetworkNamespace, bufferShare, args.ParseTLSHandshakes, collector, summary, pool); err != nil {
771798
errChan <- interfaceError{
772799
interfaceName: interfaceName,
773800
err: errors.Wrapf(err, "failed to collect trace on interface %s", interfaceName),
@@ -884,6 +911,9 @@ func (a *apidump) Run() error {
884911
printer.Stderr.Errorf("Encountered an error on interface %s. Error: %s\n", interfaceErr.interfaceName, interfaceErr.err.Error())
885912
break DoneWaitingForSignal
886913
}
914+
case externalStopError := <-daemonSetProcessStopChan:
915+
printer.Stderr.Infof("Received external stop signal, error: %v\n", externalStopError)
916+
break DoneWaitingForSignal
887917
}
888918
}
889919
}

apidump/net.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
"time"
1111

1212
"github.com/akitasoftware/akita-libs/api_schema"
13-
"github.com/google/gopacket/pcap"
13+
"github.com/akitasoftware/go-utils/optionals"
1414
"github.com/pkg/errors"
1515
"github.com/postmanlabs/postman-insights-agent/architecture"
1616
"github.com/postmanlabs/postman-insights-agent/consts"
1717
"github.com/postmanlabs/postman-insights-agent/env"
18+
"github.com/postmanlabs/postman-insights-agent/pcap"
1819
"github.com/postmanlabs/postman-insights-agent/printer"
1920
"github.com/postmanlabs/postman-insights-agent/telemetry"
2021
)
@@ -96,7 +97,7 @@ func showPermissionErrors(sampleError error) error {
9697
// Get the list of interface names that we should listen on. By default, this is
9798
// all interfaces on the machine that are up. User may override this with
9899
// --interface flag.
99-
func getEligibleInterfaces(userSpecified []string) (map[string]interfaceInfo, error) {
100+
func getEligibleInterfaces(userSpecified []string, targetNetworkNamespaceOpt optionals.Optional[string]) (map[string]interfaceInfo, error) {
100101
if len(userSpecified) > 0 {
101102
results := make(map[string]interfaceInfo, len(userSpecified))
102103
for _, n := range userSpecified {
@@ -107,7 +108,7 @@ func getEligibleInterfaces(userSpecified []string) (map[string]interfaceInfo, er
107108
results[n] = iface
108109
}
109110

110-
ifaceErrs := checkPcapPermissions(results)
111+
ifaceErrs := checkPcapPermissions(results, targetNetworkNamespaceOpt)
111112
for i, err := range ifaceErrs {
112113
// Return error if we're not able to listen on a user-specified interface.
113114
printer.Errorf("Error on interface %q: %v\n", i, err)
@@ -140,7 +141,7 @@ func getEligibleInterfaces(userSpecified []string) (map[string]interfaceInfo, er
140141
// Don't return error if we're unable to listen to one of the available
141142
// interfaces, and just listen to the interfaces we have the permissions
142143
// for.
143-
ifaceErrs := checkPcapPermissions(results)
144+
ifaceErrs := checkPcapPermissions(results, targetNetworkNamespaceOpt)
144145
var sampleError error
145146
for ifaceName, err := range ifaceErrs {
146147
printer.Warningf("Skipping interface %s for collecting packets because of error: %v\n", ifaceName, err)
@@ -184,7 +185,7 @@ func (pe pcapPermErr) Error() string {
184185

185186
// Check if we have permission to capture packets on the given set of
186187
// interfaces.
187-
func checkPcapPermissions(interfaces map[string]interfaceInfo) map[string]error {
188+
func checkPcapPermissions(interfaces map[string]interfaceInfo, targetNetworkNamespaceOpt optionals.Optional[string]) map[string]error {
188189
printer.Debugf("Checking pcap permissions...\n")
189190
start := time.Now()
190191

@@ -194,7 +195,7 @@ func checkPcapPermissions(interfaces map[string]interfaceInfo) map[string]error
194195
for iface := range interfaces {
195196
go func(iface string) {
196197
defer wg.Done()
197-
h, err := pcap.OpenLive(iface, 1600, true, pcap.BlockForever)
198+
h, err := pcap.GetPcapHandle(iface, 1600, true, pcap.BlockForever, targetNetworkNamespaceOpt)
198199
if err != nil {
199200
telemetry.Error("pcap permissions", err)
200201
errChan <- &pcapPermErr{iface: iface, err: err}

apispec/defaults.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,7 @@ const (
3535

3636
// How often to rotate traces in the back end.
3737
DefaultTraceRotateInterval = time.Hour
38+
39+
// Process all possible witness data
40+
DefaultSampleRate = 1.0
3841
)

cmd/internal/apidump/apidump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func init() {
256256
Cmd.Flags().Float64Var(
257257
&sampleRateFlag,
258258
"sample-rate",
259-
1.0,
259+
apispec.DefaultSampleRate,
260260
"A number between [0.0, 1.0] to control sampling.",
261261
)
262262
Cmd.Flags().MarkDeprecated("sample-rate", "use --rate-limit instead.")

cmd/internal/cmderr/checks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func CheckAPIKeyAndInsightsProjectID(projectID string) error {
4242
return errors.New("project ID is missing, it must be specified")
4343
}
4444

45-
frontClient := rest.NewFrontClient(rest.Domain, telemetry.GetClientID())
45+
frontClient := rest.NewFrontClient(rest.Domain, telemetry.GetClientID(), nil)
4646
var serviceID akid.ServiceID
4747
err = akid.ParseIDAs(projectID, &serviceID)
4848
if err != nil {
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package daemonset
2+
3+
import (
4+
"runtime/debug"
5+
6+
"github.com/akitasoftware/go-utils/optionals"
7+
"github.com/pkg/errors"
8+
"github.com/postmanlabs/postman-insights-agent/apidump"
9+
"github.com/postmanlabs/postman-insights-agent/apispec"
10+
"github.com/postmanlabs/postman-insights-agent/printer"
11+
"github.com/postmanlabs/postman-insights-agent/rest"
12+
"github.com/postmanlabs/postman-insights-agent/telemetry"
13+
"k8s.io/apimachinery/pkg/types"
14+
)
15+
16+
// StartApiDumpProcess initiates the API dump process for a given pod identified by its UID.
17+
// It retrieves the pod arguments, changes the pod's traffic monitoring state, and starts the API dump process in a separate goroutine.
18+
// The goroutine handles errors and state changes, and ensures the process is stopped properly.
19+
func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {
20+
podArgs, err := d.getPodArgsFromMap(podUID)
21+
if err != nil {
22+
return err
23+
}
24+
25+
err = podArgs.changePodTrafficMonitorState(TrafficMonitoringRunning, PodRunning)
26+
if err != nil {
27+
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
28+
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringRunning)
29+
}
30+
31+
// Increment the wait group counter
32+
d.ApidumpProcessesWG.Add(1)
33+
34+
go func() (funcErr error) {
35+
// defer function handle the error (if any) in the apidump process and change the pod state accordingly
36+
defer func() {
37+
// Decrement the wait group counter
38+
d.ApidumpProcessesWG.Done()
39+
40+
nextState := TrafficMonitoringEnded
41+
42+
if err := recover(); err != nil {
43+
printer.Errorf("Panic occurred in apidump process for pod %s, err: %v\n%v\n",
44+
podArgs.PodName, err, string(debug.Stack()))
45+
nextState = TrafficMonitoringFailed
46+
} else if funcErr != nil {
47+
printer.Errorf("Error occurred in apidump process for pod %s, err: %v\n", podArgs.PodName, funcErr)
48+
nextState = TrafficMonitoringFailed
49+
} else {
50+
printer.Infof("Apidump process ended for pod %s\n", podArgs.PodName)
51+
}
52+
53+
// Move monitoring state to final apidump processing state
54+
err = podArgs.changePodTrafficMonitorState(nextState,
55+
TrafficMonitoringRunning, PodSucceeded, PodFailed, PodTerminated, DaemonSetShutdown)
56+
if err != nil {
57+
printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n",
58+
podArgs.PodName, podArgs.PodTrafficMonitorState, nextState, err)
59+
return
60+
}
61+
}()
62+
63+
networkNamespace, err := d.CRIClient.GetNetworkNamespace(podArgs.ContainerUUID)
64+
if err != nil {
65+
funcErr = errors.Errorf("Failed to get network namespace for pod/containerUUID: %s/%s, err: %v",
66+
podArgs.PodName, podArgs.ContainerUUID, err)
67+
return
68+
}
69+
// Prepend '/host' to network namespace, since '/proc' folder is mounted to '/host/proc'
70+
networkNamespace = "/host" + networkNamespace
71+
72+
apidumpArgs := apidump.Args{
73+
ClientID: telemetry.GetClientID(),
74+
Domain: rest.Domain,
75+
ServiceID: podArgs.InsightsProjectID,
76+
SampleRate: apispec.DefaultSampleRate,
77+
WitnessesPerMinute: apispec.DefaultRateLimit,
78+
LearnSessionLifetime: apispec.DefaultTraceRotateInterval,
79+
TelemetryInterval: apispec.DefaultTelemetryInterval_seconds,
80+
ProcFSPollingInterval: apispec.DefaultProcFSPollingInterval_seconds,
81+
CollectTCPAndTLSReports: apispec.DefaultCollectTCPAndTLSReports,
82+
ParseTLSHandshakes: apispec.DefaultParseTLSHandshakes,
83+
MaxWitnessSize_bytes: apispec.DefaultMaxWitnessSize_bytes,
84+
ReproMode: d.InsightsReproModeEnabled,
85+
DaemonsetArgs: optionals.Some(apidump.DaemonsetArgs{
86+
TargetNetworkNamespaceOpt: networkNamespace,
87+
StopChan: podArgs.StopChan,
88+
APIKey: podArgs.PodCreds.InsightsAPIKey,
89+
Environment: podArgs.PodCreds.InsightsEnvironment,
90+
}),
91+
}
92+
93+
if err := apidump.Run(apidumpArgs); err != nil {
94+
funcErr = errors.Wrapf(err, "failed to run apidump process for pod %s", podArgs.PodName)
95+
}
96+
return
97+
}()
98+
99+
return nil
100+
}
101+
102+
// StopApiDumpProcess signals the API dump process to stop for a given pod
103+
// identified by its UID. It retrieves the process's stop channel object from a map
104+
// and sends a stop signal to trigger apidump shutdown.
105+
func (d *Daemonset) StopApiDumpProcess(podUID types.UID, stopErr error) error {
106+
podArgs, err := d.getPodArgsFromMap(podUID)
107+
if err != nil {
108+
return err
109+
}
110+
111+
printer.Infof("Stopping API dump process for pod %s\n", podArgs.PodName)
112+
podArgs.StopChan <- stopErr
113+
close(podArgs.StopChan)
114+
115+
return nil
116+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package daemonset
2+
3+
import "time"
4+
5+
const (
6+
// Pod environment variables
7+
POSTMAN_INSIGHTS_PROJECT_ID = "POSTMAN_INSIGHTS_PROJECT_ID"
8+
POSTMAN_INSIGHTS_API_KEY = "POSTMAN_INSIGHTS_API_KEY"
9+
10+
// Daemonset environment variables
11+
POSTMAN_INSIGHTS_ENV = "POSTMAN_ENV" // This is same as root POSTMAN_ENV
12+
POSTMAN_INSIGHTS_VERIFICATION_TOKEN = "POSTMAN_INSIGHTS_VERIFICATION_TOKEN"
13+
POSTMAN_INSIGHTS_CLUSTER_NAME = "POSTMAN_INSIGHTS_CLUSTER_NAME"
14+
POSTMAN_INSIGHTS_REPRO_MODE_ENABLED = "POSTMAN_INSIGHTS_REPRO_MODE_ENABLED"
15+
16+
// Workers intervals
17+
DefaultTelemetryInterval = 5 * time.Minute
18+
DefaultPodHealthCheckInterval = 5 * time.Minute
19+
)

0 commit comments

Comments
 (0)