-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathfactory.go
97 lines (90 loc) · 2.82 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package server
import (
"context"
"fmt"
"net/netip"
"time"
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy"
"github.com/bitmagnet-io/bitmagnet/internal/concurrency"
"github.com/bitmagnet-io/bitmagnet/internal/protocol/dht"
"github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/responder"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
type Params struct {
fx.In
Config Config
Responder responder.Responder
Logger *zap.SugaredLogger
}
type Result struct {
fx.Out
Server lazy.Lazy[Server]
LastResponses *concurrency.AtomicValue[LastResponses] `name:"dht_server_last_responses"`
AppHook fx.Hook `group:"app_hooks"`
QueryDuration prometheus.Collector `group:"prometheus_collectors"`
QuerySuccessTotal prometheus.Collector `group:"prometheus_collectors"`
QueryErrorTotal prometheus.Collector `group:"prometheus_collectors"`
QueryConcurrency prometheus.Collector `group:"prometheus_collectors"`
}
const namespace = "bitmagnet"
const subsystem = "dht_server"
func New(p Params) Result {
lastResponses := &concurrency.AtomicValue[LastResponses]{}
collector := newPrometheusCollector()
addr, err := netip.ParseAddr(p.Config.Addr)
socket_ip_type := 4
if err != nil {
addr = netip.IPv4Unspecified()
}
if addr.Is4() {
socket_ip_type = 4
}
if addr.Is6() || addr.Is4In6() {
socket_ip_type = 6
}
ls := lazy.New(func() (Server, error) {
s := queryLimiter{
server: prometheusServerWrapper{
prometheusCollector: collector,
server: healthCollector{
baseServer: &server{
stopped: make(chan struct{}),
localAddr: netip.AddrPortFrom(addr, p.Config.Port),
socket: NewSocket(socket_ip_type),
queries: make(map[string]chan dht.RecvMsg),
queryTimeout: p.Config.QueryTimeout,
responder: p.Responder,
responderTimeout: time.Second * 5,
idIssuer: &variantIdIssuer{},
logger: p.Logger.Named(subsystem),
},
lastResponses: lastResponses,
},
},
queryLimiter: concurrency.NewKeyedLimiter(rate.Every(time.Second), 4, 1000, time.Second*20),
}
if err := s.start(); err != nil {
return nil, fmt.Errorf("could not start server: %w", err)
}
return s, nil
})
return Result{
Server: ls,
AppHook: fx.Hook{
OnStop: func(context.Context) error {
return ls.IfInitialized(func(s Server) error {
s.stop()
return nil
})
},
},
LastResponses: lastResponses,
QueryDuration: collector.queryDuration,
QuerySuccessTotal: collector.querySuccessTotal,
QueryErrorTotal: collector.queryErrorTotal,
QueryConcurrency: collector.queryConcurrency,
}
}