Skip to content

Commit af7aac5

Browse files
committed
Fixed several issues
1 parent 79a7ad5 commit af7aac5

15 files changed

+271
-214
lines changed

examples/simplewebsocket/websockethandler.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ type ResponseMessageDto struct {
1616
Message string `json:"message"`
1717
}
1818

19-
func (msg SubscribeMessageDto) Validate() *validate.Validator {
20-
return validate.New()
19+
func (msg SubscribeMessageDto) Validate() (bool, map[string]string) {
20+
v := validate.New()
21+
return v.Valid(), v.Errors()
2122
}
2223

2324
func (msg SubscribeMessageDto) Topic() string {

pkg/communication/ws/websocket.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ func (h WebSocketHandler[T]) Handler() http.HandlerFunc {
124124
return
125125
}
126126

127-
if v := msg.Validate(); !v.Valid() {
128-
FailedValidationResponse(r.Context(), conn, v.Errors)
127+
if valid, errors := msg.Validate(); !valid {
128+
FailedValidationResponse(r.Context(), conn, errors)
129129
return
130130
}
131131

pkg/communication/ws/websocket_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ type TestSubscribeMsg struct {
2222
TopicName string `json:"topicName"`
2323
}
2424

25-
func (s TestSubscribeMsg) Validate() *validate.Validator {
25+
func (s TestSubscribeMsg) Validate() (bool, map[string]string) {
2626
v := validate.New()
2727

28-
validate.Check(v, s.TopicName, validate.IsNotEmpty, "topicName")
28+
validate.Check(v, "topicName", s.TopicName, validate.IsNotEmpty)
2929

30-
return v
30+
return v.Valid(), v.Errors()
3131
}
3232

3333
func (s TestSubscribeMsg) Topic() string {

pkg/database/postgres/pgx_sync_tx.go

+33-86
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"github.com/XDoubleU/essentia/pkg/database"
77
"github.com/jackc/pgx/v5"
88
"github.com/jackc/pgx/v5/pgconn"
9-
"github.com/jackc/pgx/v5/pgtype"
109
)
1110

1211
// PgxSyncTx uses [database.SyncTx] to make sure
@@ -17,19 +16,14 @@ type PgxSyncTx struct {
1716

1817
// PgxSyncRow is a concurrent wrapper for [pgx.Row].
1918
type PgxSyncRow struct {
20-
rows pgx.Rows
21-
err error
19+
syncTx *database.SyncTx[pgx.Tx]
20+
row pgx.Row
2221
}
2322

2423
// PgxSyncRows is a concurrent wrapper for [pgx.Rows].
2524
type PgxSyncRows struct {
26-
values [][]any
27-
rawValues [][][]byte
28-
err error
29-
fieldDescriptions []pgconn.FieldDescription
30-
commandTag pgconn.CommandTag
31-
conn *pgx.Conn
32-
i int
25+
syncTx *database.SyncTx[pgx.Tx]
26+
rows pgx.Rows
3327
}
3428

3529
// CreatePgxSyncTx returns a [pgx.Tx] which works concurrently.
@@ -62,48 +56,17 @@ func (tx *PgxSyncTx) Query(
6256
sql string,
6357
args ...any,
6458
) (pgx.Rows, error) {
65-
return database.WrapInSyncTx(
66-
ctx,
67-
tx.syncTx,
68-
func(ctx context.Context) (*PgxSyncRows, error) {
69-
rows, err := tx.syncTx.Tx.Query(ctx, sql, args...)
70-
if err != nil {
71-
return nil, err
72-
}
73-
defer rows.Close()
74-
75-
var results [][]any
76-
var rawResults [][][]byte
77-
for rows.Next() {
78-
var values []any
79-
values, err = rows.Values()
80-
if err != nil {
81-
break
82-
}
83-
84-
temp := rows.RawValues()
85-
rawValues := make([][]byte, len(temp))
86-
copy(rawValues, temp)
87-
88-
results = append(results, values)
89-
rawResults = append(rawResults, rawValues)
90-
}
91-
92-
if err == nil {
93-
err = rows.Err()
94-
}
95-
96-
return &PgxSyncRows{
97-
values: results,
98-
rawValues: rawResults,
99-
err: err,
100-
fieldDescriptions: rows.FieldDescriptions(),
101-
commandTag: rows.CommandTag(),
102-
conn: rows.Conn(),
103-
i: -1,
104-
}, nil
105-
},
106-
)
59+
tx.syncTx.Mutex.Lock()
60+
61+
rows, err := tx.syncTx.Tx.Query(ctx, sql, args...)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
return &PgxSyncRows{
67+
syncTx: tx.syncTx,
68+
rows: rows,
69+
}, nil
10770
}
10871

10972
// SendBatch is used to wrap [pgx.Tx.QueryRow] in a [database.SyncTx].
@@ -117,84 +80,68 @@ func (tx *PgxSyncTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResul
11780
)
11881
}
11982

120-
// Close doesn't do anything for [PgxSyncRows] as these are closed in [Query].
83+
// Close closes the opened [pgx.Rows].
12184
func (rows *PgxSyncRows) Close() {
85+
rows.syncTx.Unlock()
86+
rows.rows.Close()
12287
}
12388

12489
// CommandTag fetches the [pgconn.CommandTag].
12590
func (rows *PgxSyncRows) CommandTag() pgconn.CommandTag {
126-
return rows.commandTag
91+
return rows.rows.CommandTag()
12792
}
12893

12994
// Conn fetches the [pgx.Conn].
13095
func (rows *PgxSyncRows) Conn() *pgx.Conn {
131-
return rows.conn
96+
return rows.rows.Conn()
13297
}
13398

13499
// Err fetches any errors.
135100
func (rows *PgxSyncRows) Err() error {
136-
return rows.err
101+
return rows.rows.Err()
137102
}
138103

139104
// FieldDescriptions fetches [pgconn.FieldDescription]s.
140105
func (rows *PgxSyncRows) FieldDescriptions() []pgconn.FieldDescription {
141-
return rows.fieldDescriptions
106+
return rows.rows.FieldDescriptions()
142107
}
143108

144109
// Next continues to the next row of [PgxSyncRows] if there is one.
145110
func (rows *PgxSyncRows) Next() bool {
146-
rows.i++
147-
return rows.i < len(rows.values)
111+
return rows.rows.Next()
148112
}
149113

150114
// RawValues fetches the raw values of the current row.
151115
func (rows *PgxSyncRows) RawValues() [][]byte {
152-
return rows.rawValues[rows.i]
116+
return rows.rows.RawValues()
153117
}
154118

155119
// Scan scans the data of the current row into dest.
156120
func (rows *PgxSyncRows) Scan(dest ...any) error {
157-
if err := rows.Err(); err != nil {
158-
return err
159-
}
160-
161-
return pgx.ScanRow(
162-
pgtype.NewMap(),
163-
rows.FieldDescriptions(),
164-
rows.RawValues(),
165-
dest...)
121+
return rows.rows.Scan(dest...)
166122
}
167123

168124
// Values fetches the values of the current row.
169125
func (rows *PgxSyncRows) Values() ([]any, error) {
170-
return rows.values[rows.i], nil
126+
return rows.rows.Values()
171127
}
172128

173129
// QueryRow is used to wrap [pgx.Tx.QueryRow] in a [database.SyncTx].
174130
func (tx *PgxSyncTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
175-
rows, err := tx.Query(ctx, sql, args...)
131+
tx.syncTx.Mutex.Lock()
132+
133+
row := tx.syncTx.Tx.QueryRow(ctx, sql, args...)
176134

177135
return &PgxSyncRow{
178-
rows: rows,
179-
err: err,
136+
syncTx: tx.syncTx,
137+
row: row,
180138
}
181139
}
182140

183141
// Scan scans the data of [PgxSyncRow] into dest.
184142
func (row *PgxSyncRow) Scan(dest ...any) error {
185-
if row.err != nil {
186-
return row.err
187-
}
188-
189-
if err := row.rows.Err(); err != nil {
190-
return err
191-
}
192-
193-
if !row.rows.Next() {
194-
return pgx.ErrNoRows
195-
}
196-
197-
return row.rows.Scan(dest...)
143+
defer row.syncTx.Unlock()
144+
return row.row.Scan(dest...)
198145
}
199146

200147
// Ping is used to wrap [pgx.Tx.Conn.Ping] in a [database.SyncTx].

pkg/database/postgres/pgx_sync_tx_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package postgres_test
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -32,6 +33,55 @@ func setup(t *testing.T) *postgres.PgxSyncTx {
3233
return postgres.CreatePgxSyncTx(context.Background(), db)
3334
}
3435

36+
func TestParallel(t *testing.T) {
37+
tx := setup(t)
38+
defer func() { err := tx.Rollback(context.Background()); assert.Nil(t, err) }()
39+
40+
db := postgres.NewSpanDB(tx)
41+
42+
_, err := db.Exec(
43+
context.Background(),
44+
"CREATE TABLE kv (key VARCHAR(255), value VARCHAR(255));",
45+
)
46+
require.Nil(t, err)
47+
48+
_, err = db.Exec(
49+
context.Background(),
50+
"INSERT INTO kv (key, value) VALUES ('key1', 'value1');",
51+
)
52+
require.Nil(t, err)
53+
54+
mu1 := sync.Mutex{}
55+
mu2 := sync.Mutex{}
56+
57+
mu1.Lock()
58+
mu2.Lock()
59+
60+
go queryRow(t, db, &mu1)
61+
go queryRow(t, db, &mu2)
62+
63+
mu1.Lock()
64+
mu2.Lock()
65+
66+
assert.True(t, true)
67+
}
68+
69+
func queryRow(t *testing.T, db postgres.DB, mu *sync.Mutex) {
70+
for i := 0; i < 100; i++ {
71+
var key string
72+
var value string
73+
err := db.QueryRow(
74+
context.Background(),
75+
"SELECT key, value FROM kv WHERE key = 'key1';",
76+
).Scan(&key, &value)
77+
assert.Nil(t, err)
78+
79+
time.Sleep(10 * time.Millisecond)
80+
}
81+
82+
mu.Unlock()
83+
}
84+
3585
func TestPing(t *testing.T) {
3686
tx := setup(t)
3787
defer func() { err := tx.Rollback(context.Background()); assert.Nil(t, err) }()
@@ -74,6 +124,7 @@ func TestQuery(t *testing.T) {
74124

75125
rows, err := db.Query(context.Background(), "SELECT key, value FROM kv;")
76126
require.Nil(t, err)
127+
defer rows.Close()
77128

78129
results := make([][]string, 2)
79130
results[0] = make([]string, 2)

pkg/parse/main_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func TestArrayQueryParamFailedParseFunc(t *testing.T) {
123123
req,
124124
"queryParam",
125125
[]int{1, 2},
126-
parse.IntFunc(false, true),
126+
parse.Int(false, true),
127127
)
128128

129129
assert.Equal(t, []int{}, result)

pkg/parse/parser_funcs.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ import (
1212
// ParserFunc is the expected format used for parsing data using any parsing function.
1313
type ParserFunc[T any] func(paramType string, paramName string, value string) (T, error)
1414

15+
// String is used to parse a parameter as string value.
16+
// As all parameters are string by default this returns the original value.
17+
func String(paramType string, paramName string, value string) (string, error) {
18+
return value, nil
19+
}
20+
1521
// UUID is used to parse a parameter as UUID value.
1622
// Technically this only validates if a string is a UUID.
1723
func UUID(paramType string, paramName string, value string) (string, error) {
@@ -28,15 +34,15 @@ func UUID(paramType string, paramName string, value string) (string, error) {
2834
return uuidVal.String(), nil
2935
}
3036

31-
// IntFunc parses a parameter as [int].
32-
func IntFunc(isPositive bool, isZero bool) ParserFunc[int] {
37+
// Int parses a parameter as [int].
38+
func Int(isPositive bool, isZero bool) ParserFunc[int] {
3339
return func(paramType string, paramName string, value string) (int, error) {
3440
return parseInt[int](isPositive, isZero, paramType, paramName, value, 0)
3541
}
3642
}
3743

38-
// Int64Func parses a parameter as [int64].
39-
func Int64Func(isPositive bool, isZero bool) ParserFunc[int64] {
44+
// Int64 parses a parameter as [int64].
45+
func Int64(isPositive bool, isZero bool) ParserFunc[int64] {
4046
return func(paramType string, paramName string, value string) (int64, error) {
4147
//nolint:mnd // no magic number
4248
return parseInt[int64](isPositive, isZero, paramType, paramName, value, 64)
@@ -83,9 +89,9 @@ func parseInt[T shared.IntType](
8389
return T(result), nil
8490
}
8591

86-
// DateFunc parses a parameter as a date.
92+
// Date parses a parameter as a date.
8793
// The parameter should match the required date layout.
88-
func DateFunc(layout string) ParserFunc[time.Time] {
94+
func Date(layout string) ParserFunc[time.Time] {
8995
return func(paramType string, paramName string, value string) (time.Time, error) {
9096
result, err := time.Parse(layout, value)
9197
if err != nil {

0 commit comments

Comments
 (0)