Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactoring #48

Draft
wants to merge 20 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cli_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"fmt"
"os"

"github.com/crimist/trakx/config"
"github.com/crimist/trakx/controller"
"go.uber.org/zap"
)

func printHelp() {
Expand Down Expand Up @@ -41,7 +43,12 @@ func main() {
return
}

controller := controller.NewController()
conf, err := config.Load()
if err != nil {
zap.L().Fatal("failed to load configuration", zap.Error(err))
}

controller := controller.NewController(conf)

switch os.Args[1] {
case "status":
Expand Down
File renamed without changes.
16 changes: 16 additions & 0 deletions cmd/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package tracker

import (
"fmt"
"net/http"
_ "net/http/pprof"

"go.uber.org/zap"
)

func servePprof(port int) {
zap.L().Info("Serving pprof", zap.Int("port", port))

// serve on localhost
http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", port), nil)
}
21 changes: 10 additions & 11 deletions tracker/signalhandler.go → cmd/signalhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"os/signal"
"syscall"

"github.com/crimist/trakx/config"
"github.com/crimist/trakx/storage"
"github.com/crimist/trakx/tracker/http"
"github.com/crimist/trakx/tracker/storage"
"github.com/crimist/trakx/tracker/udp"

"go.uber.org/zap"
Expand All @@ -18,7 +17,7 @@ var SigStop = os.Interrupt

const exitSuccess = 0

func signalHandler(peerdb storage.Database, udptracker *udp.UDPTracker, httptracker *http.HTTPTracker) {
func signalHandler(peerdb storage.Database, udptracker *udp.Tracker, httptracker *http.Tracker) {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM, syscall.SIGUSR1)

Expand All @@ -27,36 +26,36 @@ func signalHandler(peerdb storage.Database, udptracker *udp.UDPTracker, httptrac

switch sig {
case os.Interrupt, syscall.SIGTERM: // Exit
config.Logger.Info("Received exit signal", zap.Any("signal", sig))
zap.L().Info("Received exit signal", zap.Any("signal", sig))

udptracker.Shutdown()
httptracker.Shutdown()

if err := peerdb.Backup().Save(); err != nil {
config.Logger.Error("Database save failed", zap.Error(err))
zap.L().Error("Database save failed", zap.Error(err))
}

if err := udptracker.WriteConns(); err != nil {
config.Logger.Error("UDP connections save failed", zap.Error(err))
zap.L().Error("UDP connections save failed", zap.Error(err))
}

os.Exit(exitSuccess)

case syscall.SIGUSR1: // Save
config.Logger.Info("Received save signal", zap.Any("signal", sig))
zap.L().Info("Received save signal", zap.Any("signal", sig))

if err := peerdb.Backup().Save(); err != nil {
config.Logger.Error("Database save failed", zap.Error(err))
zap.L().Error("Database save failed", zap.Error(err))
}

if err := udptracker.WriteConns(); err != nil {
config.Logger.Error("UDP connections save failed", zap.Error(err))
zap.L().Error("UDP connections save failed", zap.Error(err))
}

config.Logger.Info("Saves successful")
zap.L().Info("Saves successful")

default:
config.Logger.Info("Received unknown signal, ignoring", zap.Any("signal", sig))
zap.L().Info("Received unknown signal, ignoring", zap.Any("signal", sig))
}
}
}
131 changes: 131 additions & 0 deletions cmd/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package tracker

import (
"expvar"
"fmt"
gohttp "net/http"
"time"

"github.com/crimist/trakx/bencoding"
"github.com/crimist/trakx/config"
"github.com/crimist/trakx/pools"
"github.com/crimist/trakx/stats"
"github.com/crimist/trakx/storage"
"github.com/crimist/trakx/tracker/http"
"github.com/crimist/trakx/tracker/udp"
"go.uber.org/zap"

// import database types so init is called
_ "github.com/crimist/trakx/storage/inmemory"
)

// Run initializes and runs the tracker with the requested configuration settings.
func Run(conf *config.Configuration) {
var udptracker *udp.Tracker
var httptracker http.Tracker
var err error

zap.L().Info("Loaded configuration, starting trakx...")

warnings := conf.Validate()
if warnings&config.WarningUDPValidation != 0 {
zap.L().Warn("Configuration warning [UDP.ConnDB.Validate]: UDP connection validation is disabled. Do not expose this service to untrusted networks; it could be abused in UDP based amplification attacks.")
}
if warnings&config.WarningPeerExpiry != 0 {
zap.L().Warn("Configuration warning [conf.Announce]: Peer expiry time < announce interval. Peers will expire from the database between announces")
}

peerdb, err := storage.Open()
if err != nil {
zap.L().Fatal("Failed to initialize storage", zap.Error(err))
} else {
zap.L().Info("Initialized storage")
}

pools.Initialize(int(conf.Numwant.Limit))

// TODO: put trackers in a slice of type `Tracker` (iface) and then pass it down to the signal handler which can then call Shutdown() on all of them
go signalHandler(peerdb, udptracker, &httptracker)

if conf.Debug.Pprof != 0 {
go servePprof(conf.Debug.Pprof)
}

if conf.HTTP.Mode == config.TrackerModeEnabled {
zap.L().Info("HTTP tracker enabled", zap.Int("port", conf.HTTP.Port), zap.String("ip", conf.HTTP.IP))

httptracker.Init(peerdb)
go func() {
if err := httptracker.Serve(); err != nil {
zap.L().Fatal("Failed to serve HTTP tracker", zap.Error(err))
}
}()
} else if conf.HTTP.Mode == config.TrackerModeInfo {
// serve basic html server
cache, err := config.GenerateEmbeddedCache()
if err != nil {
zap.L().Fatal("failed to generate embedded cache", zap.Error(err))
}

// create big interval for announce response to reduce load
d := bencoding.NewDictionary()
d.Int64("interval", 86400) // 1 day
announceResponse := d.GetBytes()

expvarHandler := expvar.Handler()

mux := gohttp.NewServeMux()
mux.HandleFunc("/heartbeat", func(w gohttp.ResponseWriter, r *gohttp.Request) {})
mux.HandleFunc("/stats", func(w gohttp.ResponseWriter, r *gohttp.Request) {
expvarHandler.ServeHTTP(w, r)
})
mux.HandleFunc("/scrape", func(w gohttp.ResponseWriter, r *gohttp.Request) {})
mux.HandleFunc("/announce", func(w gohttp.ResponseWriter, r *gohttp.Request) {
w.Write(announceResponse)
})

for filepath, data := range cache {
dataBytes := []byte(data)
mux.HandleFunc(filepath, func(w gohttp.ResponseWriter, r *gohttp.Request) {
w.Write(dataBytes)
})
}

server := gohttp.Server{
Addr: fmt.Sprintf(":%d", conf.HTTP.Port),
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 7 * time.Second,
IdleTimeout: 0,
}
server.SetKeepAlivesEnabled(false)

zap.L().Info("Running HTTP info server", zap.Int("port", conf.HTTP.Port))
go func() {
if err := server.ListenAndServe(); err != nil {
zap.L().Error("Failed to start HTTP server", zap.Error(err))
}
}()
}

// UDP tracker
if conf.UDP.Enabled {
zap.L().Info("UDP tracker enabled", zap.Int("port", conf.UDP.Port), zap.String("ip", conf.UDP.IP))
udptracker = udp.NewTracker(peerdb)

go func() {
if err := udptracker.Serve(); err != nil {
zap.L().Fatal("Failed to serve UDP tracker", zap.Error(err))
}
}()
}

if conf.ExpvarInterval > 0 {
stats.Publish(peerdb, func() int64 {
return int64(udptracker.Connections())
})
} else {
zap.L().Debug("Finished Run() no expvar - blocking forever")
select {}
}
}
114 changes: 114 additions & 0 deletions cmd/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package tracker

import (
"fmt"
"syscall"
"testing"
"time"

"github.com/crimist/trakx/config"
)

func TestMain(m *testing.M) {
conf := &config.Configuration{
LogLevel: "debug",
Debug: struct{ Pprof int }{
Pprof: 0,
},
ExpvarInterval: 0,
UDP: struct {
Enabled bool
IP string
Port int
Threads int
ConnDB struct {
Validate bool
Size uint64
Trim time.Duration
Expiry time.Duration
}
}{
ConnDB: struct {
Validate bool
Size uint64
Trim time.Duration
Expiry time.Duration
}{
Validate: true,
Trim: 1 * time.Hour,
Expiry: 1 * time.Hour,
},
Enabled: true,
Port: 1337,
Threads: 1,
},
Announce: struct {
Base time.Duration
Fuzz time.Duration
}{
Base: 0,
Fuzz: 1 * time.Second,
},
HTTP: struct {
Mode string
IP string
Port int
Timeout struct {
Read time.Duration
Write time.Duration
}
Threads int
}{
Mode: "enabled",
Port: 1337,
Timeout: struct {
Read time.Duration
Write time.Duration
}{
Read: 2 * time.Second,
Write: 2 * time.Second,
},
Threads: 1,
},
Numwant: struct {
Default uint
Limit uint
}{
Default: 100,
Limit: 100,
},
DB: struct {
Type string
Backup struct {
Frequency time.Duration
Type string
Path string
}
Trim time.Duration
Expiry time.Duration
}{
Type: "gomap",
Trim: 1 * time.Hour,
Expiry: 1 * time.Hour,
Backup: struct {
Frequency time.Duration
Type string
Path string
}{
Type: "none",
Frequency: 0,
},
},
}

fmt.Println("Starting mock tracker...")
go Run(conf)
time.Sleep(100 * time.Millisecond)
fmt.Println("started!")

m.Run()

fmt.Println("Shutting down mock tracker...")
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
time.Sleep(100 * time.Millisecond)
}
Loading
Loading