@@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
53
53
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
54
54
// in the map
55
55
type FlowFetcher struct {
56
- objects * BpfObjects
57
- qdiscs map [ifaces.Interface ]* netlink.GenericQdisc
58
- egressFilters map [ifaces.Interface ]* netlink.BpfFilter
59
- ingressFilters map [ifaces.Interface ]* netlink.BpfFilter
60
- ringbufReader * ringbuf.Reader
61
- cacheMaxSize int
62
- enableIngress bool
63
- enableEgress bool
64
- pktDropsTracePoint link.Link
65
- rttFentryLink link.Link
66
- rttKprobeLink link.Link
56
+ objects * BpfObjects
57
+ qdiscs map [ifaces.Interface ]* netlink.GenericQdisc
58
+ egressFilters map [ifaces.Interface ]* netlink.BpfFilter
59
+ ingressFilters map [ifaces.Interface ]* netlink.BpfFilter
60
+ ringbufReader * ringbuf.Reader
61
+ cacheMaxSize int
62
+ enableIngress bool
63
+ enableEgress bool
64
+ pktDropsTracePoint link.Link
65
+ rttFentryLink link.Link
66
+ rttKprobeLink link.Link
67
+ lookupAndDeleteSupported bool
67
68
}
68
69
69
70
type FlowFetcherConfig struct {
@@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
119
120
return nil , fmt .Errorf ("rewriting BPF constants definition: %w" , err )
120
121
}
121
122
122
- oldKernel := utils .IskernelOlderthan514 ()
123
+ oldKernel := utils .IsKernelOlderThan ("5.14.0" )
124
+ if oldKernel {
125
+ log .Infof ("kernel older than 5.14.0 detected: not all hooks are supported" )
126
+ }
123
127
objects , err := kernelSpecificLoadAndAssign (oldKernel , spec )
124
128
if err != nil {
125
129
return nil , err
@@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
165
169
}
166
170
167
171
return & FlowFetcher {
168
- objects : & objects ,
169
- ringbufReader : flows ,
170
- egressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
171
- ingressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
172
- qdiscs : map [ifaces.Interface ]* netlink.GenericQdisc {},
173
- cacheMaxSize : cfg .CacheMaxSize ,
174
- enableIngress : cfg .EnableIngress ,
175
- enableEgress : cfg .EnableEgress ,
176
- pktDropsTracePoint : pktDropsLink ,
177
- rttFentryLink : rttFentryLink ,
178
- rttKprobeLink : rttKprobeLink ,
172
+ objects : & objects ,
173
+ ringbufReader : flows ,
174
+ egressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
175
+ ingressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
176
+ qdiscs : map [ifaces.Interface ]* netlink.GenericQdisc {},
177
+ cacheMaxSize : cfg .CacheMaxSize ,
178
+ enableIngress : cfg .EnableIngress ,
179
+ enableEgress : cfg .EnableEgress ,
180
+ pktDropsTracePoint : pktDropsLink ,
181
+ rttFentryLink : rttFentryLink ,
182
+ rttKprobeLink : rttKprobeLink ,
183
+ lookupAndDeleteSupported : true , // this will be turned off later if found to be not supported
179
184
}, nil
180
185
}
181
186
@@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
404
409
}
405
410
406
411
// LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
407
- // It returns a map where the key
408
- // For synchronization purposes, we get/delete a whole snapshot of the flows map.
409
- // This way we avoid missing packets that could be updated on the
410
- // ebpf side while we process/aggregate them here
411
- // Changing this method invocation by BatchLookupAndDelete could improve performance
412
412
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
413
413
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
414
- // Race conditions here causes that some flows are lost in high-load scenarios
415
414
func (m * FlowFetcher ) LookupAndDeleteMap (met * metrics.Metrics ) map [BpfFlowId ][]BpfFlowMetrics {
415
+ if ! m .lookupAndDeleteSupported {
416
+ return m .legacyLookupAndDeleteMap (met )
417
+ }
418
+
416
419
flowMap := m .objects .AggregatedFlows
417
420
418
421
iterator := flowMap .Iterate ()
419
422
var flows = make (map [BpfFlowId ][]BpfFlowMetrics , m .cacheMaxSize )
423
+ var ids []BpfFlowId
420
424
var id BpfFlowId
421
425
var metrics []BpfFlowMetrics
422
426
423
- count := 0
424
- // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
425
- // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
427
+ // First, get all ids and don't care about metrics (we need lookup+delete to be atomic)
426
428
for iterator .Next (& id , & metrics ) {
429
+ ids = append (ids , id )
430
+ }
431
+
432
+ count := 0
433
+ // Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
434
+ for i , id := range ids {
427
435
count ++
428
- if err := flowMap .Delete (id ); err != nil {
436
+ if err := flowMap .LookupAndDelete (& id , & metrics ); err != nil {
437
+ if i == 0 && errors .Is (err , ebpf .ErrNotSupported ) {
438
+ log .WithError (err ).Warnf ("switching to legacy mode" )
439
+ m .lookupAndDeleteSupported = false
440
+ return m .legacyLookupAndDeleteMap (met )
441
+ }
429
442
log .WithError (err ).WithField ("flowId" , id ).Warnf ("couldn't delete flow entry" )
430
443
met .Errors .WithErrorName ("flow-fetcher" , "CannotDeleteFlows" ).Inc ()
444
+ continue
431
445
}
432
- // We observed that eBFP PerCPU map might insert multiple times the same key in the map
433
- // (probably due to race conditions) so we need to re-join metrics again at userspace
434
- // TODO: instrument how many times the keys are is repeated in the same eviction
435
- flows [id ] = append (flows [id ], metrics ... )
446
+ flows [id ] = metrics
436
447
}
437
448
met .BufferSizeGauge .WithBufferName ("hashmap-total" ).Set (float64 (count ))
438
449
met .BufferSizeGauge .WithBufferName ("hashmap-unique" ).Set (float64 (len (flows )))
@@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
451
462
monotonicTimeNow := monotime .Now ()
452
463
dnsMap := m .objects .DnsFlows
453
464
var dnsKey BpfDnsFlowId
465
+ var keysToDelete []BpfDnsFlowId
454
466
var dnsVal uint64
455
467
456
468
if dnsMap != nil {
469
+ // Ideally the Lookup + Delete should be atomic, however we cannot use LookupAndDelete since the deletion is conditional
470
+ // Do not delete while iterating, as it causes severe performance degradation
457
471
iterator := dnsMap .Iterate ()
458
472
for iterator .Next (& dnsKey , & dnsVal ) {
459
473
if time .Duration (uint64 (monotonicTimeNow )- dnsVal ) >= timeOut {
460
- if err := dnsMap .Delete (dnsKey ); err != nil {
461
- log .WithError (err ).WithField ("dnsKey" , dnsKey ).
462
- Warnf ("couldn't delete DNS record entry" )
463
- }
474
+ keysToDelete = append (keysToDelete , dnsKey )
475
+ }
476
+ }
477
+ for _ , dnsKey = range keysToDelete {
478
+ if err := dnsMap .Delete (dnsKey ); err != nil {
479
+ log .WithError (err ).WithField ("dnsKey" , dnsKey ).Warnf ("couldn't delete DNS record entry" )
464
480
}
465
481
}
466
482
}
@@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf
529
545
530
546
// It provides access to packets from the kernel space (via PerfCPU hashmap)
531
547
type PacketFetcher struct {
532
- objects * BpfObjects
533
- qdiscs map [ifaces.Interface ]* netlink.GenericQdisc
534
- egressFilters map [ifaces.Interface ]* netlink.BpfFilter
535
- ingressFilters map [ifaces.Interface ]* netlink.BpfFilter
536
- perfReader * perf.Reader
537
- cacheMaxSize int
538
- enableIngress bool
539
- enableEgress bool
548
+ objects * BpfObjects
549
+ qdiscs map [ifaces.Interface ]* netlink.GenericQdisc
550
+ egressFilters map [ifaces.Interface ]* netlink.BpfFilter
551
+ ingressFilters map [ifaces.Interface ]* netlink.BpfFilter
552
+ perfReader * perf.Reader
553
+ cacheMaxSize int
554
+ enableIngress bool
555
+ enableEgress bool
556
+ lookupAndDeleteSupported bool
540
557
}
541
558
542
559
func NewPacketFetcher (
@@ -605,14 +622,15 @@ func NewPacketFetcher(
605
622
}
606
623
607
624
return & PacketFetcher {
608
- objects : & objects ,
609
- perfReader : packets ,
610
- egressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
611
- ingressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
612
- qdiscs : map [ifaces.Interface ]* netlink.GenericQdisc {},
613
- cacheMaxSize : cacheMaxSize ,
614
- enableIngress : ingress ,
615
- enableEgress : egress ,
625
+ objects : & objects ,
626
+ perfReader : packets ,
627
+ egressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
628
+ ingressFilters : map [ifaces.Interface ]* netlink.BpfFilter {},
629
+ qdiscs : map [ifaces.Interface ]* netlink.GenericQdisc {},
630
+ cacheMaxSize : cacheMaxSize ,
631
+ enableIngress : ingress ,
632
+ enableEgress : egress ,
633
+ lookupAndDeleteSupported : true , // this will be turned off later if found to be not supported
616
634
}, nil
617
635
}
618
636
@@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
797
815
}
798
816
799
817
func (p * PacketFetcher ) LookupAndDeleteMap (met * metrics.Metrics ) map [int ][]* byte {
818
+ if ! p .lookupAndDeleteSupported {
819
+ return p .legacyLookupAndDeleteMap (met )
820
+ }
821
+
800
822
packetMap := p .objects .PacketRecord
801
823
iterator := packetMap .Iterate ()
802
824
packets := make (map [int ][]* byte , p .cacheMaxSize )
803
-
804
825
var id int
826
+ var ids []int
805
827
var packet []* byte
828
+
829
+ // First, get all ids and ignore content (we need lookup+delete to be atomic)
806
830
for iterator .Next (& id , & packet ) {
807
- if err := packetMap .Delete (id ); err != nil {
808
- log .WithError (err ).WithField ("packetID " , id ).
809
- Warnf ("couldn't delete entry" )
810
- met .Errors .WithErrorName ("pkt-fetcher" , "CannotDeleteFlows" ).Inc ()
831
+ ids = append (ids , id )
832
+ }
833
+
834
+ // Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
835
+ for i , id := range ids {
836
+ if err := packetMap .LookupAndDelete (& id , & packet ); err != nil {
837
+ if i == 0 && errors .Is (err , ebpf .ErrNotSupported ) {
838
+ log .WithError (err ).Warnf ("switching to legacy mode" )
839
+ p .lookupAndDeleteSupported = false
840
+ return p .legacyLookupAndDeleteMap (met )
841
+ }
842
+ log .WithError (err ).WithField ("packetID" , id ).Warnf ("couldn't delete entry" )
843
+ met .Errors .WithErrorName ("pkt-fetcher" , "CannotDeleteEntry" ).Inc ()
811
844
}
812
- packets [id ] = append ( packets [ id ], packet ... )
845
+ packets [id ] = packet
813
846
}
847
+
814
848
return packets
815
849
}
0 commit comments