From f92d05f17c76e8ce326bad6cd9002f383b8d1415 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Thu, 27 Feb 2025 12:57:32 +0800 Subject: [PATCH] Add Req/Res count/time to candidate stats (#763) These details will provide information for connectivity issue. --- agent.go | 18 ++++++-- agent_stats.go | 16 ++++--- agent_test.go | 17 ++++++-- candidatepair.go | 110 ++++++++++++++++++++++++++++++++++++++++++++++- selection.go | 5 ++- stats.go | 12 ++++++ 6 files changed, 163 insertions(+), 15 deletions(-) diff --git a/agent.go b/agent.go index fd1faaea..7110c326 100644 --- a/agent.go +++ b/agent.go @@ -980,18 +980,23 @@ func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Cand return nil } -func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) { +func (a *Agent) sendBindingRequest(msg *stun.Message, local, remote Candidate) { a.log.Tracef("Ping STUN from %s to %s", local, remote) a.invalidatePendingBindingRequests(time.Now()) a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{ timestamp: time.Now(), - transactionID: m.TransactionID, + transactionID: msg.TransactionID, destination: remote.addr(), - isUseCandidate: m.Contains(stun.AttrUseCandidate), + isUseCandidate: msg.Contains(stun.AttrUseCandidate), }) - a.sendSTUN(m, local, remote) + if pair := a.findPair(local, remote); pair != nil { + pair.UpdateRequestSent() + } else { + a.log.Warnf("Failed to find pair for add binding request from %s to %s", local, remote) + } + a.sendSTUN(msg, local, remote) } func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) { @@ -1014,6 +1019,11 @@ func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) { ); err != nil { a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err) } else { + if pair := a.findPair(local, remote); pair != nil { + pair.UpdateResponseSent() + } else { + a.log.Warnf("Failed to find pair for add binding response from %s to %s", local, remote) + } a.sendSTUN(out, local, remote) } } diff --git a/agent_stats.go b/agent_stats.go index 45a56294..c6b21f57 100644 --- a/agent_stats.go +++ b/agent_stats.go @@ -26,18 +26,22 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats { // BytesReceived uint64 // LastPacketSentTimestamp time.Time // LastPacketReceivedTimestamp time.Time - // FirstRequestTimestamp time.Time - // LastRequestTimestamp time.Time - // LastResponseTimestamp time.Time + FirstRequestTimestamp: cp.FirstRequestSentAt(), + LastRequestTimestamp: cp.LastRequestSentAt(), + FirstResponseTimestamp: cp.FirstReponseReceivedAt(), + LastResponseTimestamp: cp.LastResponseReceivedAt(), + FirstRequestReceivedTimestamp: cp.FirstRequestReceivedAt(), + LastRequestReceivedTimestamp: cp.LastRequestReceivedAt(), + TotalRoundTripTime: cp.TotalRoundTripTime(), CurrentRoundTripTime: cp.CurrentRoundTripTime(), // AvailableOutgoingBitrate float64 // AvailableIncomingBitrate float64 // CircuitBreakerTriggerCount uint32 - // RequestsReceived uint64 - // RequestsSent uint64 + RequestsReceived: cp.RequestsReceived(), + RequestsSent: cp.RequestsSent(), ResponsesReceived: cp.ResponsesReceived(), - // ResponsesSent uint64 + ResponsesSent: cp.ResponsesSent(), // RetransmissionsReceived uint64 // RetransmissionsSent uint64 // ConsentRequestsSent uint64 diff --git a/agent_test.go b/agent_test.go index 42a18465..8fd289a2 100644 --- a/agent_test.go +++ b/agent_test.go @@ -636,7 +636,7 @@ func TestInvalidGather(t *testing.T) { }) } -func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop +func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop,gocyclo defer test.CheckRoutines(t)() // Avoid deadlocks? @@ -715,14 +715,18 @@ func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop p := agent.findPair(hostLocal, remote) if p == nil { - agent.addPair(hostLocal, remote) + p = agent.addPair(hostLocal, remote) } + p.UpdateRequestReceived() + p.UpdateRequestSent() + p.UpdateResponseSent() + p.UpdateRoundTripTime(time.Second) } p := agent.findPair(hostLocal, prflxRemote) p.state = CandidatePairStateFailed - for i := 0; i < 10; i++ { + for i := 1; i < 10; i++ { p.UpdateRoundTripTime(time.Duration(i+1) * time.Second) } @@ -749,6 +753,13 @@ func TestCandidatePairsStats(t *testing.T) { //nolint:cyclop default: t.Fatal("invalid remote candidate ID") } + + if cps.FirstRequestTimestamp.IsZero() || cps.LastRequestTimestamp.IsZero() || + cps.FirstResponseTimestamp.IsZero() || cps.LastResponseTimestamp.IsZero() || + cps.FirstRequestReceivedTimestamp.IsZero() || cps.LastRequestReceivedTimestamp.IsZero() || + cps.RequestsReceived == 0 || cps.RequestsSent == 0 || cps.ResponsesSent == 0 || cps.ResponsesReceived == 0 { + t.Fatal("failed to verify pair stats counter and timestamps", cps) + } } if relayPairStat.RemoteCandidateID != relayRemote.ID() { diff --git a/candidatepair.go b/candidatepair.go index e2b4d5e2..82655aa5 100644 --- a/candidatepair.go +++ b/candidatepair.go @@ -33,7 +33,18 @@ type CandidatePair struct { // stats currentRoundTripTime int64 // in ns totalRoundTripTime int64 // in ns - responsesReceived uint64 + + requestsReceived uint64 + requestsSent uint64 + responsesReceived uint64 + responsesSent uint64 + + firstRequestSentAt atomic.Value // time.Time + lastRequestSentAt atomic.Value // time.Time + firstReponseReceivedAt atomic.Value // time.Time + lastResponseReceivedAt atomic.Value // time.Time + firstRequestReceivedAt atomic.Value // time.Time + lastRequestReceivedAt atomic.Value // time.Time } func (p *CandidatePair) String() string { @@ -127,6 +138,10 @@ func (p *CandidatePair) UpdateRoundTripTime(rtt time.Duration) { atomic.StoreInt64(&p.currentRoundTripTime, rttNs) atomic.AddInt64(&p.totalRoundTripTime, rttNs) atomic.AddUint64(&p.responsesReceived, 1) + + now := time.Now() + p.firstReponseReceivedAt.CompareAndSwap(nil, now) + p.lastResponseReceivedAt.Store(now) } // CurrentRoundTripTime returns the current round trip time in seconds @@ -141,8 +156,101 @@ func (p *CandidatePair) TotalRoundTripTime() float64 { return time.Duration(atomic.LoadInt64(&p.totalRoundTripTime)).Seconds() } +// RequestsReceived returns the total number of connectivity checks received +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-requestsreceived +func (p *CandidatePair) RequestsReceived() uint64 { + return atomic.LoadUint64(&p.requestsReceived) +} + +// RequestsSent returns the total number of connectivity checks sent +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-requestssent +func (p *CandidatePair) RequestsSent() uint64 { + return atomic.LoadUint64(&p.requestsSent) +} + // ResponsesReceived returns the total number of connectivity responses received // https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-responsesreceived func (p *CandidatePair) ResponsesReceived() uint64 { return atomic.LoadUint64(&p.responsesReceived) } + +// ResponsesSent returns the total number of connectivity responses sent +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-responsessent +func (p *CandidatePair) ResponsesSent() uint64 { + return atomic.LoadUint64(&p.responsesSent) +} + +// FirstRequestSentAt returns the timestamp of the first connectivity check sent. +func (p *CandidatePair) FirstRequestSentAt() time.Time { + if v, ok := p.firstRequestSentAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// LastRequestSentAt returns the timestamp of the last connectivity check sent. +func (p *CandidatePair) LastRequestSentAt() time.Time { + if v, ok := p.lastRequestSentAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// FirstReponseReceivedAt returns the timestamp of the first connectivity response received. +func (p *CandidatePair) FirstReponseReceivedAt() time.Time { + if v, ok := p.firstReponseReceivedAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// LastResponseReceivedAt returns the timestamp of the last connectivity response received. +func (p *CandidatePair) LastResponseReceivedAt() time.Time { + if v, ok := p.lastResponseReceivedAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// FirstRequestReceivedAt returns the timestamp of the first connectivity check received. +func (p *CandidatePair) FirstRequestReceivedAt() time.Time { + if v, ok := p.firstRequestReceivedAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// LastRequestReceivedAt returns the timestamp of the last connectivity check received. +func (p *CandidatePair) LastRequestReceivedAt() time.Time { + if v, ok := p.lastRequestReceivedAt.Load().(time.Time); ok { + return v + } + + return time.Time{} +} + +// UpdateRequestSent increments the number of requests sent and updates the timestamp. +func (p *CandidatePair) UpdateRequestSent() { + atomic.AddUint64(&p.requestsSent, 1) + now := time.Now() + p.firstRequestSentAt.CompareAndSwap(nil, now) + p.lastRequestSentAt.Store(now) +} + +// UpdateResponseSent increments the number of responses sent. +func (p *CandidatePair) UpdateResponseSent() { + atomic.AddUint64(&p.responsesSent, 1) +} + +// UpdateRequestReceived increments the number of requests received and updates the timestamp. +func (p *CandidatePair) UpdateRequestReceived() { + atomic.AddUint64(&p.requestsReceived, 1) + now := time.Now() + p.firstRequestReceivedAt.CompareAndSwap(nil, now) + p.lastRequestReceivedAt.Store(now) +} diff --git a/selection.go b/selection.go index c5fce390..b67898dd 100644 --- a/selection.go +++ b/selection.go @@ -100,10 +100,12 @@ func (s *controllingSelector) HandleBindingRequest(message *stun.Message, local, pair := s.agent.findPair(local, remote) if pair == nil { - s.agent.addPair(local, remote) + pair = s.agent.addPair(local, remote) + pair.UpdateRequestReceived() return } + pair.UpdateRequestReceived() if pair.state == CandidatePairStateSucceeded && s.nominatedPair == nil && s.agent.getSelectedPair() == nil { bestPair := s.agent.getBestAvailableCandidatePair() @@ -281,6 +283,7 @@ func (s *controlledSelector) HandleBindingRequest(message *stun.Message, local, if pair == nil { pair = s.agent.addPair(local, remote) } + pair.UpdateRequestReceived() if message.Contains(stun.AttrUseCandidate) { //nolint:nestif // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 diff --git a/stats.go b/stats.go index 8a08f677..30a0c02b 100644 --- a/stats.go +++ b/stats.go @@ -58,10 +58,22 @@ type CandidatePairStats struct { // (LastRequestTimestamp - FirstRequestTimestamp) / RequestsSent. LastRequestTimestamp time.Time + // FirstResponseTimestamp represents the timestamp at which the first STUN response + // was received on this particular candidate pair. + FirstResponseTimestamp time.Time + // LastResponseTimestamp represents the timestamp at which the last STUN response // was received on this particular candidate pair. LastResponseTimestamp time.Time + // FirstRequestReceivedTimestamp represents the timestamp at which the first + // connectivity check request was received. + FirstRequestReceivedTimestamp time.Time + + // LastRequestReceivedTimestamp represents the timestamp at which the last + // connectivity check request was received. + LastRequestReceivedTimestamp time.Time + // TotalRoundTripTime represents the sum of all round trip time measurements // in seconds since the beginning of the session, based on STUN connectivity // check responses (ResponsesReceived), including those that reply to requests