Skip to content

Commit

Permalink
Fix/several fixes (#104)
Browse files Browse the repository at this point in the history
* Fix loghandler

* Add logging to config

* Rework GoRoutineErrorHandler to GoRoutineWrapper

* Fix linter errors

* Extended postgres interface

* Fixed several issues

* Fixed linter errors
  • Loading branch information
xdoubleu authored Jan 19, 2025
1 parent f649a83 commit d12c1f1
Show file tree
Hide file tree
Showing 37 changed files with 767 additions and 332 deletions.
18 changes: 12 additions & 6 deletions examples/simplehttp/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import "github.com/XDoubleU/essentia/pkg/config"
import (
"log/slog"

"github.com/XDoubleU/essentia/pkg/config"
)

type Config struct {
Env string
Expand All @@ -9,13 +13,15 @@ type Config struct {
AllowedOrigins []string
}

func NewConfig() Config {
func NewConfig(logger *slog.Logger) Config {
c := config.New(logger)

var cfg Config

cfg.Env = config.EnvStr("ENV", config.ProdEnv)
cfg.Port = config.EnvInt("PORT", 8000)
cfg.DBDsn = config.EnvStr("DB_DSN", "postgres://postgres@localhost/postgres")
cfg.AllowedOrigins = config.EnvStrArray(
cfg.Env = c.EnvStr("ENV", config.ProdEnv)
cfg.Port = c.EnvInt("PORT", 8000)
cfg.DBDsn = c.EnvStr("DB_DSN", "postgres://postgres@localhost/postgres")
cfg.AllowedOrigins = c.EnvStrArray(
"ALLOWED_ORIGINS",
[]string{"http://localhost"},
)
Expand Down
3 changes: 2 additions & 1 deletion examples/simplehttp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ func NewApp(logger *slog.Logger, config Config, db postgres.DB) application {
}

func main() {
cfg := NewConfig()
cfg := NewConfig(slog.New(slog.NewTextHandler(os.Stdout, nil)))

logger := slog.New(
sentrytools.NewLogHandler(cfg.Env, slog.NewTextHandler(os.Stdout, nil)),
)

db, err := postgres.Connect(
logger,
cfg.DBDsn,
Expand Down
6 changes: 3 additions & 3 deletions examples/simplehttp/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
)

func TestHealth(t *testing.T) {
cfg := NewConfig()
cfg.Env = config.TestEnv

logger := logging.NewNopLogger()

cfg := NewConfig(logger)
cfg.Env = config.TestEnv

db, err := postgres.Connect(
logger,
cfg.DBDsn,
Expand Down
16 changes: 11 additions & 5 deletions examples/simplewebsocket/config.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import "github.com/XDoubleU/essentia/pkg/config"
import (
"log/slog"

"github.com/XDoubleU/essentia/pkg/config"
)

type Config struct {
Env string
Port int
AllowedOrigins []string
}

func NewConfig() Config {
func NewConfig(logger *slog.Logger) Config {
c := config.New(logger)

var cfg Config

cfg.Env = config.EnvStr("ENV", config.ProdEnv)
cfg.Port = config.EnvInt("PORT", 8000)
cfg.AllowedOrigins = config.EnvStrArray(
cfg.Env = c.EnvStr("ENV", config.ProdEnv)
cfg.Port = c.EnvInt("PORT", 8000)
cfg.AllowedOrigins = c.EnvStrArray(
"ALLOWED_ORIGINS",
[]string{"http://localhost"},
)
Expand Down
2 changes: 1 addition & 1 deletion examples/simplewebsocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewApp(logger *slog.Logger, config Config) application {
}

func main() {
cfg := NewConfig()
cfg := NewConfig(slog.New(slog.NewTextHandler(os.Stdout, nil)))

logger := slog.New(
sentrytools.NewLogHandler(cfg.Env, slog.NewTextHandler(os.Stdout, nil)),
Expand Down
3 changes: 2 additions & 1 deletion examples/simplewebsocket/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"testing"

"github.com/XDoubleU/essentia/pkg/config"
"github.com/XDoubleU/essentia/pkg/logging"
sentrytools "github.com/XDoubleU/essentia/pkg/sentry"
"github.com/XDoubleU/essentia/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWebSocket(t *testing.T) {
cfg := NewConfig()
cfg := NewConfig(logging.NewNopLogger())
cfg.Env = config.TestEnv

logger := slog.New(
Expand Down
6 changes: 4 additions & 2 deletions examples/simplewebsocket/websockethandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type ResponseMessageDto struct {
Message string `json:"message"`
}

func (msg SubscribeMessageDto) Validate() *validate.Validator {
return validate.New()
func (msg SubscribeMessageDto) Validate() (bool, map[string]string) {
v := validate.New()
return v.Valid(), v.Errors()
}

func (msg SubscribeMessageDto) Topic() string {
Expand All @@ -34,6 +35,7 @@ func (app *application) websocketRoutes(mux *http.ServeMux) {
func (app *application) getWebSocketHandler() http.HandlerFunc {

wsHandler := wstools.CreateWebSocketHandler[SubscribeMessageDto](
app.logger,
1,
10,
)
Expand Down
17 changes: 12 additions & 5 deletions internal/shared/any_to_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package shared
import (
"errors"
"fmt"
"strconv"
)

func arrayToString[T any](array []T) (string, error) {
Expand All @@ -27,16 +26,24 @@ func AnyToString(value any) (string, error) {
switch value := value.(type) {
case string:
return value, nil
case int:
return strconv.Itoa(value), nil
case int64:
return strconv.FormatInt(value, 10), nil
case bool:
return fmt.Sprintf("%t", value), nil
case int, int64:
return fmt.Sprintf("%d", value), nil
case float32, float64:
return fmt.Sprintf("%.2f", value), nil
case []string:
return arrayToString(value)
case []bool:
return arrayToString(value)
case []int:
return arrayToString(value)
case []int64:
return arrayToString(value)
case []float32:
return arrayToString(value)
case []float64:
return arrayToString(value)
default:
return "", errors.New("undefined type")
}
Expand Down
3 changes: 2 additions & 1 deletion internal/wsinternal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wsinternal

import (
"context"
"log/slog"
"math"
"sync"
)
Expand Down Expand Up @@ -50,7 +51,7 @@ func (worker *Worker) EnqueueEvent(event any) {
}

// Start makes [Worker] start doing work.
func (worker *Worker) Start(_ context.Context) error {
func (worker *Worker) Start(_ context.Context, _ *slog.Logger) error {
// already active
if worker.Active() {
return nil
Expand Down
12 changes: 10 additions & 2 deletions internal/wsinternal/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wsinternal
import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/XDoubleU/essentia/pkg/sentry"
Expand All @@ -19,14 +20,20 @@ const stopEvent = "stop"
// WorkerPool is used to divide [Subscriber]s between [Worker]s.
// This prevents one [Worker] of being very busy.
type WorkerPool struct {
logger *slog.Logger
subscribers []Subscriber
subscribersMu *sync.RWMutex
workers []Worker
}

// NewWorkerPool creates a new [WorkerPool].
func NewWorkerPool(maxWorkers int, channelBufferSize int) *WorkerPool {
func NewWorkerPool(
logger *slog.Logger,
maxWorkers int,
channelBufferSize int,
) *WorkerPool {
pool := &WorkerPool{
logger: logger,
subscribers: []Subscriber{},
subscribersMu: &sync.RWMutex{},
workers: make([]Worker, maxWorkers),
Expand Down Expand Up @@ -80,8 +87,9 @@ func (pool *WorkerPool) RemoveSubscriber(sub Subscriber) {
// Start starts [Worker]s of a [WorkerPool] if they weren't active yet.
func (pool *WorkerPool) Start() {
for i := range pool.workers {
go sentry.GoRoutineErrorHandler(
go sentry.GoRoutineWrapper(
context.Background(),
pool.logger,
fmt.Sprintf("Worker %d", i),
pool.workers[i].Start,
)
Expand Down
17 changes: 13 additions & 4 deletions internal/wsinternal/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/XDoubleU/essentia/internal/wsinternal"
"github.com/XDoubleU/essentia/pkg/logging"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,7 +53,9 @@ func (sub *TestSubscriber) Output() string {
const sleep = 100 * time.Millisecond

func TestBasic(t *testing.T) {
wp := wsinternal.NewWorkerPool(1, 10)
logger := logging.NewNopLogger()

wp := wsinternal.NewWorkerPool(logger, 1, 10)

tSub := NewTestSubscriber()
wp.AddSubscriber(tSub)
Expand Down Expand Up @@ -81,7 +84,9 @@ func TestBasic(t *testing.T) {
}

func TestMoreWorkersThanSubs(t *testing.T) {
wp := wsinternal.NewWorkerPool(2, 10)
logger := logging.NewNopLogger()

wp := wsinternal.NewWorkerPool(logger, 2, 10)

tSub := NewTestSubscriber()
wp.AddSubscriber(tSub)
Expand All @@ -100,7 +105,9 @@ func TestMoreWorkersThanSubs(t *testing.T) {
}

func TestAddRemoveSubscriberWhileWorkersActive(t *testing.T) {
wp := wsinternal.NewWorkerPool(2, 10)
logger := logging.NewNopLogger()

wp := wsinternal.NewWorkerPool(logger, 2, 10)

tSub := NewTestSubscriber()
wp.AddSubscriber(tSub)
Expand Down Expand Up @@ -146,7 +153,9 @@ func work(t *testing.T, wp *wsinternal.WorkerPool, nr int) {
}

func TestToggleWork(t *testing.T) {
wp := wsinternal.NewWorkerPool(1, 10)
logger := logging.NewNopLogger()

wp := wsinternal.NewWorkerPool(logger, 1, 10)

work(t, wp, 1)
work(t, wp, 2)
Expand Down
4 changes: 4 additions & 0 deletions pkg/communication/ws/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
wstools "github.com/XDoubleU/essentia/pkg/communication/ws"
"github.com/XDoubleU/essentia/pkg/config"
errortools "github.com/XDoubleU/essentia/pkg/errors"
"github.com/XDoubleU/essentia/pkg/logging"
sentrytools "github.com/XDoubleU/essentia/pkg/sentry"
"github.com/XDoubleU/essentia/pkg/test"
"github.com/stretchr/testify/assert"
Expand All @@ -35,7 +36,10 @@ func testErrorStatusCode(t *testing.T, handler http.HandlerFunc) int {
func setupWS(t *testing.T, allowedOrigin string) http.Handler {
t.Helper()

logger := logging.NewNopLogger()

wsHandler := wstools.CreateWebSocketHandler[TestSubscribeMsg](
logger,
1,
10,
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/communication/ws/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ws

import (
"context"
"log/slog"
"strings"

"github.com/XDoubleU/essentia/internal/wsinternal"
Expand All @@ -23,6 +24,7 @@ type Topic struct {

// NewTopic creates a new [Topic].
func NewTopic(
logger *slog.Logger,
name string,
allowedOrigins []string,
maxWorkers int,
Expand All @@ -39,6 +41,7 @@ func NewTopic(
Name: name,
allowedOrigins: allowedOrigins,
pool: wsinternal.NewWorkerPool(
logger,
maxWorkers,
channelBufferSize,
),
Expand Down
9 changes: 7 additions & 2 deletions pkg/communication/ws/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ws

import (
"fmt"
"log/slog"
"net/http"
"net/url"
"path/filepath"
Expand All @@ -22,17 +23,20 @@ type SubscribeMessageDto interface {
// A WebSocketHandler handles incoming requests to a
// websocket and makes sure subscriptions are made to the right topics.
type WebSocketHandler[T SubscribeMessageDto] struct {
logger *slog.Logger
maxTopicWorkers int
topicChannelBufferSize int
topicMap map[string]*Topic
}

// CreateWebSocketHandler creates a new [WebSocketHandler].
func CreateWebSocketHandler[T SubscribeMessageDto](
logger *slog.Logger,
maxTopicWorkers int,
topicChannelBufferSize int,
) WebSocketHandler[T] {
return WebSocketHandler[T]{
logger: logger,
maxTopicWorkers: maxTopicWorkers,
topicChannelBufferSize: topicChannelBufferSize,
topicMap: make(map[string]*Topic),
Expand All @@ -53,6 +57,7 @@ func (h *WebSocketHandler[T]) AddTopic(
}

topic := NewTopic(
h.logger,
topicName,
allowedOrigins,
h.maxTopicWorkers,
Expand Down Expand Up @@ -119,8 +124,8 @@ func (h WebSocketHandler[T]) Handler() http.HandlerFunc {
return
}

if v := msg.Validate(); !v.Valid() {
FailedValidationResponse(r.Context(), conn, v.Errors)
if valid, errors := msg.Validate(); !valid {
FailedValidationResponse(r.Context(), conn, errors)
return
}

Expand Down
Loading

0 comments on commit d12c1f1

Please sign in to comment.