From 6bbabfb8800ef21da1368e09a5687a848a7a3a13 Mon Sep 17 00:00:00 2001 From: Vasily Tsybenko Date: Fri, 14 Feb 2025 17:18:46 +0200 Subject: [PATCH] Add dbrutil/README.md --- .github/workflows/test.yml | 3 + dbrutil/README.md | 250 ++++++++++++++++++ dbrutil/dbrutil_test.go | 8 +- .../examples/dbr-instrumentation-1/main.go | 92 +++++++ .../examples/dbr-instrumentation-2/main.go | 110 ++++++++ dbrutil/{metric.go => metrics.go} | 15 +- dbrutil/slow.go | 2 +- distrlock/README.md | 8 +- distrlock/example_test.go | 6 + go.mod | 6 +- metrics.go | 66 ++--- mssql/mssql_test.go | 10 + mysql/mysql_test.go | 2 +- retryable.go | 2 +- sqlite/sqlite_test.go | 9 + 15 files changed, 536 insertions(+), 53 deletions(-) create mode 100644 dbrutil/README.md create mode 100644 dbrutil/examples/dbr-instrumentation-1/main.go create mode 100644 dbrutil/examples/dbr-instrumentation-2/main.go rename dbrutil/{metric.go => metrics.go} (71%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2aad586..dbe7ff8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,6 +37,9 @@ jobs: - name: Run tests with coverage run: go test -race -cover -coverprofile="coverage.out" -covermode=atomic -v --coverpkg=./... ./... + - name: Exclude examples from coverage + run: grep -Ev '/examples/' ./coverage.out > ./coverage_filtered.out && mv -f ./coverage_filtered.out ./coverage.out + - name: Check coverage run: | real_coverage=$(go tool cover -func=coverage.out | grep total | awk '{print substr($3, 1, length($3)-1)}') diff --git a/dbrutil/README.md b/dbrutil/README.md new file mode 100644 index 0000000..b82ef1e --- /dev/null +++ b/dbrutil/README.md @@ -0,0 +1,250 @@ +# dbrutil + +[![GoDoc Widget]][GoDoc] + +## Overview + +`dbrutil` is a Go package that provides utilities and helpers for working with the [dbr query builder](https://github.com/gocraft/dbr). +It simplifies database operations by offering: +- **Database Connection Management**: Open a database connection with instrumentation for collecting metrics and logging slow queries. +- **Transaction Management**: Run functions within transactions using a unified `TxRunner` interface that automatically commits or rolls back. +- **Retryable Transactions**: Execute transactions with configurable retry policies. +- **Prometheus Metrics Collection**: Collect and observe SQL query durations via SQL comment annotations. +- **Slow Query Logging**: Log SQL queries that exceed a configurable duration threshold. + +## Usage + +The following basic example demonstrates how to use `dbrutil` to create a database connection and run a transaction: + +```go +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + stdlog "log" + "net/http" + "os" + "time" + + "github.com/acronis/go-appkit/log" + "github.com/gocraft/dbr/v2" + + "github.com/acronis/go-dbkit" + "github.com/acronis/go-dbkit/dbrutil" +) + +func main() { + logger, loggerClose := log.NewLogger(&log.Config{Output: log.OutputStderr, Level: log.LevelInfo}) + defer loggerClose() + + // Create a Prometheus metrics collector. + promMetrics := dbkit.NewPrometheusMetrics() + promMetrics.MustRegister() + defer promMetrics.Unregister() + + // Open the database connection with instrumentation. + // Instrumentation includes collecting metrics about SQL queries and logging slow queries. + eventReceiver := dbrutil.NewCompositeReceiver([]dbr.EventReceiver{ + dbrutil.NewQueryMetricsEventReceiver(promMetrics, queryAnnotationPrefix), + dbrutil.NewSlowQueryLogEventReceiver(logger, 100*time.Millisecond, queryAnnotationPrefix), + }) + conn, err := openDB(eventReceiver) + if err != nil { + stdlog.Fatal(err) + } + defer conn.Close() + + txRunner := dbrutil.NewTxRunner(conn, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, nil) + + // Execute function in a transaction. + // The transaction will be automatically committed if the function returns nil, otherwise it will be rolled back. + if dbErr := txRunner.DoInTx(context.Background(), func(tx dbr.SessionRunner) error { + var result int + return tx.Select("SLEEP(1)"). + Comment(annotateQuery("long_operation")). // Annotate the query for Prometheus metrics and slow query log. + LoadOne(&result) + }); dbErr != nil { + stdlog.Fatal(dbErr) + } + + // The following log message will be printed: + // {"level":"warn","time":"2025-02-14T16:29:55.429257+02:00","msg":"slow SQL query","pid":14030,"annotation":"query:long_operation","duration_ms":1007} + + // Prometheus metrics will be collected: + // db_query_duration_seconds_bucket{query="query:long_operation",le="2.5"} 1 + // db_query_duration_seconds_sum{query="query:long_operation"} 1.004573875 + // db_query_duration_seconds_count{query="query:long_operation"} 1 +} + +const queryAnnotationPrefix = "query:" + +func annotateQuery(queryName string) string { + return queryAnnotationPrefix + queryName +} + +func openDB(eventReceiver dbr.EventReceiver) (*dbr.Connection, error) { + cfg := &dbkit.Config{ + Dialect: dbkit.DialectMySQL, + MySQL: dbkit.MySQLConfig{ + Host: os.Getenv("MYSQL_HOST"), + Port: 3306, + User: os.Getenv("MYSQL_USER"), + Password: os.Getenv("MYSQL_PASSWORD"), + Database: os.Getenv("MYSQL_DATABASE"), + }, + } + + // Open database with instrumentation based on the provided event receiver (see github.com/gocraft/dbr doc for details). + // Opening includes configuring the max open/idle connections and their lifetime and pinging the database. + conn, err := dbrutil.Open(cfg, true, eventReceiver) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return conn, nil +} +``` + +The next example demonstrates how to use `dbrutil` to create a middleware that injects a transaction runner into the request context. +This transaction runner will use request-scoped logger for logging slow queries. + +```go +package main + +import ( + "database/sql" + "errors" + "fmt" + stdlog "log" + "net/http" + "os" + "time" + + "github.com/acronis/go-appkit/httpserver/middleware" + "github.com/acronis/go-appkit/log" + "github.com/gocraft/dbr/v2" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/acronis/go-dbkit" + "github.com/acronis/go-dbkit/dbrutil" +) + +func main() { + logger, loggerClose := log.NewLogger(&log.Config{Output: log.OutputStdout, Level: log.LevelInfo}) + defer loggerClose() + + // Create a Prometheus metrics collector. + promMetrics := dbkit.NewPrometheusMetrics() + promMetrics.MustRegister() + defer promMetrics.Unregister() + + // Open the database connection with instrumentation. + // Instrumentation includes collecting metrics about SQL queries. + conn, err := openDB(dbrutil.NewQueryMetricsEventReceiver(promMetrics, queryAnnotationPrefix)) + if err != nil { + stdlog.Fatal(err) + } + defer conn.Close() + + // Construct the middleware that will put the transaction runner into the request context. + var txRunnerOpts dbrutil.TxRunnerMiddlewareOpts + txRunnerOpts.SlowQueryLog.MinTime = 100 * time.Millisecond // Log queries that take more than 100ms. + txRunnerOpts.SlowQueryLog.AnnotationPrefix = queryAnnotationPrefix + txRunnerMiddleware := dbrutil.TxRunnerMiddlewareWithOpts(conn, sql.LevelReadCommitted, txRunnerOpts) + + // Create a handler that will execute a long operation in a transaction. + handler := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + txRunner := dbrutil.GetTxRunnerFromContext(r.Context()) // Get the request-scoped transaction runner. + + // Execute function in a transaction. + // The transaction will be automatically committed if the function returns nil, otherwise it will be rolled back. + if dbErr := txRunner.DoInTx(r.Context(), func(tx dbr.SessionRunner) error { + var result int + return tx.Select("SLEEP(1)"). + Comment(annotateQuery("long_operation")). // Annotate the query for Prometheus metrics and slow query log. + LoadOne(&result) + }); dbErr != nil { + http.Error(rw, "Internal Server Error", http.StatusInternalServerError) + return + } + _, _ = rw.Write([]byte("OK")) + }) + + // Construct the middleware chain and start the server. + middlewares := []func(http.Handler) http.Handler{ + middleware.RequestID(), + middleware.Logging(logger), + txRunnerMiddleware, + } + var h http.Handler = handler + for i := len(middlewares) - 1; i >= 0; i-- { + h = middlewares[i](h) + } + mux := http.NewServeMux() + mux.Handle("/long-operation", h) + mux.Handle("/metrics", promhttp.Handler()) + if srvErr := http.ListenAndServe(":8080", mux); srvErr != nil && errors.Is(srvErr, http.ErrServerClosed) { + stdlog.Fatalf("failed to start server: %v", srvErr) + } +} + +const queryAnnotationPrefix = "query:" + +func annotateQuery(queryName string) string { + return queryAnnotationPrefix + queryName +} + +func openDB(eventReceiver dbr.EventReceiver) (*dbr.Connection, error) { + cfg := &dbkit.Config{ + Dialect: dbkit.DialectMySQL, + MySQL: dbkit.MySQLConfig{ + Host: os.Getenv("MYSQL_HOST"), + Port: 3306, + User: os.Getenv("MYSQL_USER"), + Password: os.Getenv("MYSQL_PASSWORD"), + Database: os.Getenv("MYSQL_DATABASE"), + }, + } + + // Open database with instrumentation based on the provided event receiver (see github.com/gocraft/dbr doc for details). + // Opening includes configuring the max open/idle connections and their lifetime and pinging the database. + conn, err := dbrutil.Open(cfg, true, eventReceiver) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return conn, nil +} +``` + +If you run the server and send a request to the `/long-operation` endpoint, you will see the following log message: + +```shell +{"level":"warn","time":"2025-02-16T19:06:39.029356+02:00","msg":"slow SQL query","pid":14117,"request_id":"cup1m7gc65t3e995a1i0","int_request_id":"cup1m7gc65t3e995a1ig","trace_id":"","annotation":"query:long_operation","duration_ms":1002} +{"level":"info","time":"2025-02-16T19:06:39.03459+02:00","msg":"response completed in 1.012s","pid":14117,"request_id":"cup1m7gc65t3e995a1i0","int_request_id":"cup1m7gc65t3e995a1ig","trace_id":"","method":"GET","uri":"/long-operation","remote_addr":"[::1]:54849","content_length":0,"user_agent":"curl/8.7.1","remote_addr_ip":"::1","remote_addr_port":54849,"duration_ms":1011,"duration":1011996,"status":200,"bytes_sent":2,"time_slots":{"writing_response_ms":0}} +``` + +As you can see, the slow query log message contains the annotation `query:long_operation`, and this log message is associated with the request ID `cup1m7gc65t3e995a1i0`. +Using the request ID, you can correlate the slow query log message with the request log message. + +Additionally, the Prometheus metrics collector will collect the following metrics: + +```shell +db_query_duration_seconds_bucket{query="query:long_operation",le="0.001"} 0 +... +db_query_duration_seconds_bucket{query="query:long_operation",le="2.5"} 1 +... +db_query_duration_seconds_bucket{query="query:long_operation",le="+Inf"} 1 +db_query_duration_seconds_sum{query="query:long_operation"} 1.00294175 +db_query_duration_seconds_count{query="query:long_operation"} 1 +``` + +## License + +Copyright © 2024 Acronis International GmbH. + +Licensed under [MIT License](./../LICENSE). + +[GoDoc]: https://pkg.go.dev/github.com/acronis/go-dbkit/dbrutil +[GoDoc Widget]: https://godoc.org/github.com/acronis/go-dbkit/dbrutil?status.svg \ No newline at end of file diff --git a/dbrutil/dbrutil_test.go b/dbrutil/dbrutil_test.go index ed763a5..ef400ab 100644 --- a/dbrutil/dbrutil_test.go +++ b/dbrutil/dbrutil_test.go @@ -146,25 +146,25 @@ func TestDbrQueryMetricsEventReceiver_TimingKv(t *testing.T) { }() t.Run("metrics for query with wrong annotation are not collected", func(t *testing.T) { - mc := dbkit.NewMetricsCollector() + mc := dbkit.NewPrometheusMetrics() metricsEventReceiver := NewQueryMetricsEventReceiver(mc, "query_") dbSess := dbConn.NewSession(metricsEventReceiver) countUsersByName(t, dbSess, "count_users_by_name", "Sam", 2) - labels := prometheus.Labels{dbkit.MetricsLabelQuery: "count_users_by_name"} + labels := prometheus.Labels{dbkit.PrometheusMetricsLabelQuery: "count_users_by_name"} hist := mc.QueryDurations.With(labels).(prometheus.Histogram) testutil.RequireSamplesCountInHistogram(t, hist, 0) }) t.Run("metrics for query are collected", func(t *testing.T) { - mc := dbkit.NewMetricsCollector() + mc := dbkit.NewPrometheusMetrics() metricsEventReceiver := NewQueryMetricsEventReceiver(mc, "query_") dbSess := dbConn.NewSession(metricsEventReceiver) countUsersByName(t, dbSess, "query_count_users_by_name", "Sam", 2) - labels := prometheus.Labels{dbkit.MetricsLabelQuery: "query_count_users_by_name"} + labels := prometheus.Labels{dbkit.PrometheusMetricsLabelQuery: "query_count_users_by_name"} hist := mc.QueryDurations.With(labels).(prometheus.Histogram) testutil.RequireSamplesCountInHistogram(t, hist, 1) }) diff --git a/dbrutil/examples/dbr-instrumentation-1/main.go b/dbrutil/examples/dbr-instrumentation-1/main.go new file mode 100644 index 0000000..fd02500 --- /dev/null +++ b/dbrutil/examples/dbr-instrumentation-1/main.go @@ -0,0 +1,92 @@ +/* +Copyright © 2025 Acronis International GmbH. + +Released under MIT license. +*/ + +package main + +import ( + "context" + "database/sql" + "fmt" + stdlog "log" + "os" + "time" + + "github.com/acronis/go-appkit/log" + "github.com/gocraft/dbr/v2" + + "github.com/acronis/go-dbkit" + "github.com/acronis/go-dbkit/dbrutil" +) + +func main() { + logger, loggerClose := log.NewLogger(&log.Config{Output: log.OutputStderr, Level: log.LevelInfo}) + defer loggerClose() + + // Create a Prometheus metrics collector. + promMetrics := dbkit.NewPrometheusMetrics() + promMetrics.MustRegister() + defer promMetrics.Unregister() + + // Open the database connection with instrumentation. + // Instrumentation includes collecting metrics about SQL queries and logging slow queries. + eventReceiver := dbrutil.NewCompositeReceiver([]dbr.EventReceiver{ + dbrutil.NewQueryMetricsEventReceiver(promMetrics, queryAnnotationPrefix), + dbrutil.NewSlowQueryLogEventReceiver(logger, 100*time.Millisecond, queryAnnotationPrefix), + }) + conn, err := openDB(eventReceiver) + if err != nil { + stdlog.Fatal(err) + } + defer conn.Close() + + txRunner := dbrutil.NewTxRunner(conn, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, nil) + + // Execute function in a transaction. + // The transaction will be automatically committed if the function returns nil, otherwise it will be rolled back. + if dbErr := txRunner.DoInTx(context.Background(), func(tx dbr.SessionRunner) error { + var result int + return tx.Select("SLEEP(1)"). + Comment(annotateQuery("long_operation")). // Annotate the query for Prometheus metrics and slow query log. + LoadOne(&result) + }); dbErr != nil { + stdlog.Fatal(dbErr) + } + + // The following log message will be printed: + // {"level":"warn","time":"2025-02-14T16:29:55.429257+02:00","msg":"slow SQL query","pid":14030,"annotation":"query:long_operation","duration_ms":1007} + + // Prometheus metrics will be collected: + // db_query_duration_seconds_bucket{query="query:long_operation",le="2.5"} 1 + // db_query_duration_seconds_sum{query="query:long_operation"} 1.004573875 + // db_query_duration_seconds_count{query="query:long_operation"} 1 +} + +const queryAnnotationPrefix = "query:" + +func annotateQuery(queryName string) string { + return queryAnnotationPrefix + queryName +} + +func openDB(eventReceiver dbr.EventReceiver) (*dbr.Connection, error) { + cfg := &dbkit.Config{ + Dialect: dbkit.DialectMySQL, + MySQL: dbkit.MySQLConfig{ + Host: os.Getenv("MYSQL_HOST"), + Port: 3306, + User: os.Getenv("MYSQL_USER"), + Password: os.Getenv("MYSQL_PASSWORD"), + Database: os.Getenv("MYSQL_DATABASE"), + }, + } + + // Open database with instrumentation based on the provided event receiver (see github.com/gocraft/dbr doc for details). + // Opening includes configuring the max open/idle connections and their lifetime and pinging the database. + conn, err := dbrutil.Open(cfg, true, eventReceiver) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return conn, nil +} diff --git a/dbrutil/examples/dbr-instrumentation-2/main.go b/dbrutil/examples/dbr-instrumentation-2/main.go new file mode 100644 index 0000000..31aa636 --- /dev/null +++ b/dbrutil/examples/dbr-instrumentation-2/main.go @@ -0,0 +1,110 @@ +/* +Copyright © 2025 Acronis International GmbH. + +Released under MIT license. +*/ +package main + +import ( + "database/sql" + "errors" + "fmt" + stdlog "log" + "net/http" + "os" + "time" + + "github.com/acronis/go-appkit/httpserver/middleware" + "github.com/acronis/go-appkit/log" + "github.com/gocraft/dbr/v2" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/acronis/go-dbkit" + "github.com/acronis/go-dbkit/dbrutil" +) + +func main() { + logger, loggerClose := log.NewLogger(&log.Config{Output: log.OutputStdout, Level: log.LevelInfo}) + defer loggerClose() + + // Create a Prometheus metrics collector. + promMetrics := dbkit.NewPrometheusMetrics() + promMetrics.MustRegister() + defer promMetrics.Unregister() + + // Open the database connection with instrumentation. + // Instrumentation includes collecting metrics about SQL queries. + conn, err := openDB(dbrutil.NewQueryMetricsEventReceiver(promMetrics, queryAnnotationPrefix)) + if err != nil { + stdlog.Fatal(err) + } + defer conn.Close() + + // Construct the middleware that will put the transaction runner into the request context. + var txRunnerOpts dbrutil.TxRunnerMiddlewareOpts + txRunnerOpts.SlowQueryLog.MinTime = 100 * time.Millisecond // Log queries that take more than 100ms. + txRunnerOpts.SlowQueryLog.AnnotationPrefix = queryAnnotationPrefix + txRunnerMiddleware := dbrutil.TxRunnerMiddlewareWithOpts(conn, sql.LevelReadCommitted, txRunnerOpts) + + // Create a handler that will execute a long operation in a transaction. + handler := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + txRunner := dbrutil.GetTxRunnerFromContext(r.Context()) // Get the request-scoped transaction runner. + + // Execute function in a transaction. + // The transaction will be automatically committed if the function returns nil, otherwise it will be rolled back. + if dbErr := txRunner.DoInTx(r.Context(), func(tx dbr.SessionRunner) error { + var result int + return tx.Select("SLEEP(1)"). + Comment(annotateQuery("long_operation")). // Annotate the query for Prometheus metrics and slow query log. + LoadOne(&result) + }); dbErr != nil { + http.Error(rw, "Internal Server Error", http.StatusInternalServerError) + return + } + _, _ = rw.Write([]byte("OK")) + }) + + // Construct the middleware chain and start the server. + middlewares := []func(http.Handler) http.Handler{ + middleware.RequestID(), + middleware.Logging(logger), + txRunnerMiddleware, + } + var h http.Handler = handler + for i := len(middlewares) - 1; i >= 0; i-- { + h = middlewares[i](h) + } + mux := http.NewServeMux() + mux.Handle("/long-operation", h) + mux.Handle("/metrics", promhttp.Handler()) + if srvErr := http.ListenAndServe(":8080", mux); srvErr != nil && errors.Is(srvErr, http.ErrServerClosed) { + stdlog.Fatalf("failed to start server: %v", srvErr) + } +} + +const queryAnnotationPrefix = "query:" + +func annotateQuery(queryName string) string { + return queryAnnotationPrefix + queryName +} + +func openDB(eventReceiver dbr.EventReceiver) (*dbr.Connection, error) { + cfg := &dbkit.Config{ + Dialect: dbkit.DialectMySQL, + MySQL: dbkit.MySQLConfig{ + Host: os.Getenv("MYSQL_HOST"), + Port: 3306, + User: os.Getenv("MYSQL_USER"), + Password: os.Getenv("MYSQL_PASSWORD"), + Database: os.Getenv("MYSQL_DATABASE"), + }, + } + + // Open database with instrumentation based on the provided event receiver (see github.com/gocraft/dbr doc for details). + // Opening includes configuring the max open/idle connections and their lifetime and pinging the database. + conn, err := dbrutil.Open(cfg, true, eventReceiver) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return conn, nil +} diff --git a/dbrutil/metric.go b/dbrutil/metrics.go similarity index 71% rename from dbrutil/metric.go rename to dbrutil/metrics.go index 669d6a3..5527ff9 100644 --- a/dbrutil/metric.go +++ b/dbrutil/metrics.go @@ -9,10 +9,8 @@ package dbrutil import ( "time" - "github.com/gocraft/dbr/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/acronis/go-dbkit" + "github.com/gocraft/dbr/v2" ) // QueryMetricsEventReceiverOpts consists options for QueryMetricsEventReceiver. @@ -22,16 +20,16 @@ type QueryMetricsEventReceiverOpts struct { } // QueryMetricsEventReceiver implements the dbr.EventReceiver interface and collects metrics about SQL queries. -// To be collected SQL query should be annotated (comment starting with specified prefix). +// To be collected, SQL query should be annotated (comment starting with specified prefix). type QueryMetricsEventReceiver struct { *dbr.NullEventReceiver - metricsCollector *dbkit.MetricsCollector + metricsCollector dbkit.MetricsCollector annotationPrefix string annotationModifier func(string) string } // NewQueryMetricsEventReceiverWithOpts creates a new QueryMetricsEventReceiver with additinal options. -func NewQueryMetricsEventReceiverWithOpts(mc *dbkit.MetricsCollector, options QueryMetricsEventReceiverOpts) *QueryMetricsEventReceiver { +func NewQueryMetricsEventReceiverWithOpts(mc *dbkit.PrometheusMetrics, options QueryMetricsEventReceiverOpts) *QueryMetricsEventReceiver { return &QueryMetricsEventReceiver{ metricsCollector: mc, annotationPrefix: options.AnnotationPrefix, @@ -40,7 +38,7 @@ func NewQueryMetricsEventReceiverWithOpts(mc *dbkit.MetricsCollector, options Qu } // NewQueryMetricsEventReceiver creates a new QueryMetricsEventReceiver. -func NewQueryMetricsEventReceiver(mc *dbkit.MetricsCollector, annotationPrefix string) *QueryMetricsEventReceiver { +func NewQueryMetricsEventReceiver(mc *dbkit.PrometheusMetrics, annotationPrefix string) *QueryMetricsEventReceiver { options := QueryMetricsEventReceiverOpts{ AnnotationPrefix: annotationPrefix, } @@ -54,6 +52,5 @@ func (er *QueryMetricsEventReceiver) TimingKv(eventName string, nanoseconds int6 if annotation == "" { return } - labels := prometheus.Labels{dbkit.MetricsLabelQuery: annotation} - er.metricsCollector.QueryDurations.With(labels).Observe(time.Duration(nanoseconds).Seconds()) + er.metricsCollector.ObserveQueryDuration(annotation, time.Duration(nanoseconds)) } diff --git a/dbrutil/slow.go b/dbrutil/slow.go index 8239505..9d0b8fb 100644 --- a/dbrutil/slow.go +++ b/dbrutil/slow.go @@ -20,7 +20,7 @@ type SlowQueryLogEventReceiverOpts struct { } // SlowQueryLogEventReceiver implements the dbr.EventReceiver interface and logs long SQL queries. -// To be logged SQL query should be annotated (comment starting with specified prefix). +// To be logged, SQL query should be annotated (comment starting with specified prefix). type SlowQueryLogEventReceiver struct { *dbr.NullEventReceiver logger log.FieldLogger diff --git a/distrlock/README.md b/distrlock/README.md index 02491ed..e6b9024 100644 --- a/distrlock/README.md +++ b/distrlock/README.md @@ -25,7 +25,7 @@ This approach ensures reliable concurrency control without requiring an external The following basic example demonstrates how to use `distrlock` to ensure exclusive execution of a critical section of code: ```go -package distrlock_test +package main import ( "context" @@ -38,7 +38,7 @@ import ( "github.com/acronis/go-dbkit/distrlock" ) -func ExampleDoExclusively() { +func main() { // Setup database connection db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN")) if err != nil { @@ -73,7 +73,7 @@ func ExampleDoExclusively() { If you need more customization or/and control over the lock lifecycle, you can use `DBManager` and `DBLock` objects directly: ```go -package distrlock_test +package main import ( "context" @@ -86,7 +86,7 @@ import ( "github.com/acronis/go-dbkit/distrlock" ) -func ExampleNewDBManager() { +func main() { // Setup database connection db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN")) if err != nil { diff --git a/distrlock/example_test.go b/distrlock/example_test.go index e95713a..74d996c 100644 --- a/distrlock/example_test.go +++ b/distrlock/example_test.go @@ -1,3 +1,9 @@ +/* +Copyright © 2025 Acronis International GmbH. + +Released under MIT license. +*/ + package distrlock_test import ( diff --git a/go.mod b/go.mod index 0a532dd..06c9e2b 100644 --- a/go.mod +++ b/go.mod @@ -14,12 +14,15 @@ require ( github.com/jackc/pgx/v5 v5.6.0 github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.24 + github.com/mitchellh/mapstructure v1.5.0 github.com/prometheus/client_golang v1.20.5 github.com/rubenv/sql-migrate v1.0.0 + github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.33.0 github.com/testcontainers/testcontainers-go/modules/mariadb v0.33.0 github.com/testcontainers/testcontainers-go/modules/postgres v0.33.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -57,7 +60,6 @@ require ( github.com/klauspost/compress v1.17.9 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect @@ -84,7 +86,6 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.19.0 // indirect github.com/ssgreg/logf v1.4.2 // indirect github.com/ssgreg/logftext v1.1.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect @@ -107,5 +108,4 @@ require ( gopkg.in/gorp.v1 v1.7.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/metrics.go b/metrics.go index dba8c0d..d066fbd 100644 --- a/metrics.go +++ b/metrics.go @@ -6,17 +6,23 @@ Released under MIT license. package dbkit -import "github.com/prometheus/client_golang/prometheus" +import ( + "time" -// Prometheus labels. -const ( - MetricsLabelQuery = "query" + "github.com/prometheus/client_golang/prometheus" ) +type MetricsCollector interface { + ObserveQueryDuration(query string, duration time.Duration) +} + +// PrometheusMetricsLabelQuery is a label name for SQL query in Prometheus metrics. +const PrometheusMetricsLabelQuery = "query" + // DefaultQueryDurationBuckets is default buckets into which observations of executing SQL queries are counted. var DefaultQueryDurationBuckets = []float64{0.001, 0.01, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} -// MetricsCollectorOpts represents an options for MetricsCollector. +// MetricsCollectorOpts represents an options for PrometheusMetrics. type MetricsCollectorOpts struct { // Namespace is a namespace for metrics. It will be prepended to all metric names. Namespace string @@ -28,31 +34,31 @@ type MetricsCollectorOpts struct { ConstLabels prometheus.Labels // CurryingLabelNames is a list of label names that will be curried with the provided labels. - // See MetricsCollector.MustCurryWith method for more details. + // See PrometheusMetrics.MustCurryWith method for more details. // Keep in mind that if this list is not empty, - // MetricsCollector.MustCurryWith method must be called further with the same labels. + // PrometheusMetrics.MustCurryWith method must be called further with the same labels. // Otherwise, the collector will panic. CurriedLabelNames []string } -// MetricsCollector represents collector of metrics. -type MetricsCollector struct { +// PrometheusMetrics represents collector of metrics. +type PrometheusMetrics struct { QueryDurations *prometheus.HistogramVec } -// NewMetricsCollector creates a new metrics collector. -func NewMetricsCollector() *MetricsCollector { - return NewMetricsCollectorWithOpts(MetricsCollectorOpts{}) +// NewPrometheusMetrics creates a new metrics collector. +func NewPrometheusMetrics() *PrometheusMetrics { + return NewPrometheusMetricsWithOpts(MetricsCollectorOpts{}) } -// NewMetricsCollectorWithOpts is a more configurable version of creating MetricsCollector. -func NewMetricsCollectorWithOpts(opts MetricsCollectorOpts) *MetricsCollector { +// NewPrometheusMetricsWithOpts is a more configurable version of creating PrometheusMetrics. +func NewPrometheusMetricsWithOpts(opts MetricsCollectorOpts) *PrometheusMetrics { queryDurationBuckets := opts.QueryDurationBuckets if queryDurationBuckets == nil { queryDurationBuckets = DefaultQueryDurationBuckets } labelNames := append(make([]string, 0, len(opts.CurriedLabelNames)+1), opts.CurriedLabelNames...) - labelNames = append(labelNames, MetricsLabelQuery) + labelNames = append(labelNames, PrometheusMetricsLabelQuery) queryDurations := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: opts.Namespace, @@ -63,32 +69,32 @@ func NewMetricsCollectorWithOpts(opts MetricsCollectorOpts) *MetricsCollector { }, labelNames, ) - - return &MetricsCollector{ - QueryDurations: queryDurations, - } + return &PrometheusMetrics{QueryDurations: queryDurations} } // MustCurryWith curries the metrics collector with the provided labels. -func (c *MetricsCollector) MustCurryWith(labels prometheus.Labels) *MetricsCollector { - return &MetricsCollector{ - QueryDurations: c.QueryDurations.MustCurryWith(labels).(*prometheus.HistogramVec), +func (pm *PrometheusMetrics) MustCurryWith(labels prometheus.Labels) *PrometheusMetrics { + return &PrometheusMetrics{ + QueryDurations: pm.QueryDurations.MustCurryWith(labels).(*prometheus.HistogramVec), } } // MustRegister does registration of metrics collector in Prometheus and panics if any error occurs. -func (c *MetricsCollector) MustRegister() { - prometheus.MustRegister(c.QueryDurations) +func (pm *PrometheusMetrics) MustRegister() { + prometheus.MustRegister(pm.QueryDurations) } // Unregister cancels registration of metrics collector in Prometheus. -func (c *MetricsCollector) Unregister() { - prometheus.Unregister(c.QueryDurations) +func (pm *PrometheusMetrics) Unregister() { + prometheus.Unregister(pm.QueryDurations) } // AllMetrics returns a list of metrics of this collector. This can be used to register these metrics in push gateway. -func (c *MetricsCollector) AllMetrics() []prometheus.Collector { - return []prometheus.Collector{ - c.QueryDurations, - } +func (pm *PrometheusMetrics) AllMetrics() []prometheus.Collector { + return []prometheus.Collector{pm.QueryDurations} +} + +// ObserveQueryDuration observes the duration of executing SQL query. +func (pm *PrometheusMetrics) ObserveQueryDuration(query string, duration time.Duration) { + pm.QueryDurations.With(prometheus.Labels{PrometheusMetricsLabelQuery: query}).Observe(duration.Seconds()) } diff --git a/mssql/mssql_test.go b/mssql/mssql_test.go index 8836b5c..3c11b57 100644 --- a/mssql/mssql_test.go +++ b/mssql/mssql_test.go @@ -24,3 +24,13 @@ func TestMSSQLIsRetryable(t *testing.T) { require.False(t, isRetryable(driver.ErrBadConn)) require.True(t, isRetryable(fmt.Errorf("wrapped error: %w", mssql.Error{Number: 1205}))) } + +func TestCheckMSSQLError(t *testing.T) { + var err error + err = mssql.Error{Number: 1205} + require.True(t, CheckMSSQLError(err, ErrDeadlock)) + err = mssql.Error{Number: 9999} + require.False(t, CheckMSSQLError(err, ErrDeadlock)) + err = fmt.Errorf("wrapped error: %w", mssql.Error{Number: 1205}) + require.True(t, CheckMSSQLError(err, ErrDeadlock)) +} diff --git a/mysql/mysql_test.go b/mysql/mysql_test.go index 13a9f29..a08539b 100644 --- a/mysql/mysql_test.go +++ b/mysql/mysql_test.go @@ -30,7 +30,7 @@ func TestMakeMySQLDSN(t *testing.T) { require.Equal(t, wantDSN, gotDSN) } -func TestMysqlIsRetryable(t *testing.T) { +func TestMySQLIsRetryable(t *testing.T) { isRetryable := dbkit.GetIsRetryable(&mysql.MySQLDriver{}) require.NotNil(t, isRetryable) require.True(t, isRetryable(&mysql.MySQLError{ diff --git a/retryable.go b/retryable.go index e413ee8..7063a50 100644 --- a/retryable.go +++ b/retryable.go @@ -15,7 +15,7 @@ import ( var retryableErrors = map[reflect.Type]retry.IsRetryable{} -// GetIsRetryable returns a function that can tell for given driver if error is retryable. +// GetIsRetryable returns a function that can tell for a given driver if error is retryable. func GetIsRetryable(d driver.Driver) retry.IsRetryable { t := reflect.TypeOf(d) if r, ok := retryableErrors[t]; ok { diff --git a/sqlite/sqlite_test.go b/sqlite/sqlite_test.go index b28f51a..bf5afe8 100644 --- a/sqlite/sqlite_test.go +++ b/sqlite/sqlite_test.go @@ -140,6 +140,15 @@ func TestSqliteIsRetryable(t *testing.T) { }))) } +func TestCheckSQLiteError(t *testing.T) { + err := sqlite3.Error{ + Code: sqlite3.ErrIoErr, + ExtendedCode: sqlite3.ErrIoErrRead, + } + require.True(t, CheckSQLiteError(err, sqlite3.ErrIoErrRead)) + require.False(t, CheckSQLiteError(err, sqlite3.ErrIoErrWrite)) +} + func execAndSleepInTx(ctx context.Context, dbConn *sql.DB, stmt string, errCh chan error, sleepTime time.Duration) { tx, txErr := dbConn.BeginTx(ctx, nil) if txErr != nil {