Skip to content

Commit 69aaefd

Browse files
author
Dmitry Kropachev
committed
feat(cli): add statement-log-file option
1 parent f60664b commit 69aaefd

File tree

11 files changed

+385
-155
lines changed

11 files changed

+385
-155
lines changed

cmd/gemini/root.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ var (
105105
requestTimeout time.Duration
106106
connectTimeout time.Duration
107107
profilingPort int
108+
testStatementLogFile string
109+
oracleStatementLogFile string
108110
)
109111

110112
func interactive() bool {
@@ -133,14 +135,6 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
133135
return schemaBuilder.Build(), nil
134136
}
135137

136-
type createBuilder struct {
137-
stmt string
138-
}
139-
140-
func (cb createBuilder) ToCql() (stmt string, names []string) {
141-
return cb.stmt, nil
142-
}
143-
144138
func run(_ *cobra.Command, _ []string) error {
145139
logger := createLogger(level)
146140
globalStatus := status.NewGlobalStatus(1000)
@@ -219,6 +213,8 @@ func run(_ *cobra.Command, _ []string) error {
219213
MaxRetriesMutate: maxRetriesMutate,
220214
MaxRetriesMutateSleep: maxRetriesMutateSleep,
221215
UseServerSideTimestamps: useServerSideTimestamps,
216+
TestLogStatementsFile: testStatementLogFile,
217+
OracleLogStatementsFile: oracleStatementLogFile,
222218
}
223219
var tracingFile *os.File
224220
if tracingOutFile != "" {
@@ -243,22 +239,25 @@ func run(_ *cobra.Command, _ []string) error {
243239
defer utils.IgnoreError(st.Close)
244240

245241
if dropSchema && mode != jobs.ReadMode {
246-
for _, stmt := range generators.GetDropSchema(schema) {
242+
for _, stmt := range generators.GetDropKeyspace(schema) {
247243
logger.Debug(stmt)
248-
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
244+
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
249245
return errors.Wrap(err, "unable to drop schema")
250246
}
251247
}
252248
}
253249

254250
testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
255-
if err = st.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil {
251+
if err = st.Create(
252+
context.Background(),
253+
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
254+
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
256255
return errors.Wrap(err, "unable to create keyspace")
257256
}
258257

259258
for _, stmt := range generators.GetCreateSchema(schema) {
260259
logger.Debug(stmt)
261-
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
260+
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
262261
return errors.Wrap(err, "unable to create schema")
263262
}
264263
}
@@ -531,6 +530,8 @@ func init() {
531530
rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established")
532531
rootCmd.Flags().IntVarP(&profilingPort, "profiling-port", "", 0, "If non-zero starts pprof profiler on given port at 'http://0.0.0.0:<port>/profile'")
533532
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
533+
rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to")
534+
rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to")
534535
}
535536

536537
func printSetup(seed, schemaSeed uint64) {

pkg/generators/statement_generator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func GetCreateSchema(s *typedef.Schema) []string {
140140
return stmts
141141
}
142142

143-
func GetDropSchema(s *typedef.Schema) []string {
143+
func GetDropKeyspace(s *typedef.Schema) []string {
144144
return []string{
145145
fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.Keyspace.Name),
146146
}

pkg/jobs/jobs.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func ddl(
329329
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
330330
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
331331
}
332-
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
332+
if err = s.Mutate(ctx, ddlStmt); err != nil {
333333
if errors.Is(err, context.Canceled) {
334334
return nil
335335
}
@@ -376,13 +376,11 @@ func mutation(
376376
}
377377
return err
378378
}
379-
mutateQuery := mutateStmt.Query
380-
mutateValues := mutateStmt.Values
381379

382380
if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
383381
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
384382
}
385-
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
383+
if err = s.Mutate(ctx, mutateStmt); err != nil {
386384
if errors.Is(err, context.Canceled) {
387385
return nil
388386
}
@@ -425,7 +423,7 @@ func validation(
425423
attempt := 1
426424
for {
427425
lastErr = err
428-
err = s.Check(ctx, table, stmt.Query, attempt == maxAttempts, stmt.Values...)
426+
err = s.Check(ctx, table, stmt, attempt == maxAttempts)
429427

430428
if err == nil {
431429
if attempt > 1 {

pkg/stmtlogger/filelogger.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2019 ScyllaDB
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package stmtlogger
16+
17+
import (
18+
"log"
19+
"os"
20+
"strconv"
21+
"sync/atomic"
22+
"time"
23+
24+
"github.com/pkg/errors"
25+
26+
"github.com/scylladb/gemini/pkg/typedef"
27+
)
28+
29+
const (
30+
defaultChanSize = 1000
31+
errorsOnFileLimit = 5
32+
)
33+
34+
type FileLogger struct {
35+
fd *os.File
36+
activeChannel atomic.Pointer[loggerChan]
37+
channel loggerChan
38+
filename string
39+
isFileNonOperational bool
40+
}
41+
42+
type loggerChan chan logRec
43+
44+
type logRec struct {
45+
stmt *typedef.Stmt
46+
ts time.Time
47+
}
48+
49+
func (fl *FileLogger) LogStmt(stmt *typedef.Stmt) {
50+
ch := fl.activeChannel.Load()
51+
if ch != nil {
52+
*ch <- logRec{
53+
stmt: stmt,
54+
}
55+
}
56+
}
57+
58+
func (fl *FileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
59+
ch := fl.activeChannel.Load()
60+
if ch != nil {
61+
*ch <- logRec{
62+
stmt: stmt,
63+
ts: ts,
64+
}
65+
}
66+
}
67+
68+
func (fl *FileLogger) Close() error {
69+
return fl.fd.Close()
70+
}
71+
72+
func (fl *FileLogger) committer() {
73+
var err2 error
74+
75+
defer func() {
76+
fl.activeChannel.Swap(nil)
77+
close(fl.channel)
78+
}()
79+
80+
errsAtRow := 0
81+
82+
for rec := range fl.channel {
83+
if fl.isFileNonOperational {
84+
continue
85+
}
86+
87+
_, err1 := fl.fd.Write([]byte(rec.stmt.PrettyCQL()))
88+
opType := rec.stmt.QueryType.OpType()
89+
if rec.ts.IsZero() || !(opType == typedef.OpInsert || opType == typedef.OpUpdate || opType == typedef.OpDelete) {
90+
_, err2 = fl.fd.Write([]byte(";\n"))
91+
} else {
92+
_, err2 = fl.fd.Write([]byte(" USING TIMESTAMP " + strconv.FormatInt(rec.ts.UnixNano()/1000, 10) + ";\n"))
93+
}
94+
if err2 == nil && err1 == nil {
95+
errsAtRow = 0
96+
continue
97+
}
98+
99+
if errors.Is(err2, os.ErrClosed) || errors.Is(err1, os.ErrClosed) {
100+
fl.isFileNonOperational = true
101+
return
102+
}
103+
104+
errsAtRow++
105+
if errsAtRow > errorsOnFileLimit {
106+
fl.isFileNonOperational = true
107+
}
108+
109+
if err2 != nil {
110+
err1 = err2
111+
}
112+
log.Printf("failed to write to file %q: %s", fl.filename, err1)
113+
return
114+
}
115+
}
116+
117+
func NewFileLogger(filename string) (*FileLogger, error) {
118+
if filename == "" {
119+
return nil, nil
120+
}
121+
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
122+
if err != nil {
123+
return nil, err
124+
}
125+
126+
out := &FileLogger{
127+
filename: filename,
128+
fd: fd,
129+
channel: make(loggerChan, defaultChanSize),
130+
}
131+
out.activeChannel.Store(&out.channel)
132+
133+
go out.committer()
134+
return out, nil
135+
}

pkg/store/cqlstore.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/scylladb/gemini/pkg/typedef"
3131
)
3232

33-
type cqlStore struct {
33+
type cqlStore struct { //nolint:govet
3434
session *gocql.Session
3535
schema *typedef.Schema
3636
ops *prometheus.CounterVec
@@ -39,20 +39,21 @@ type cqlStore struct {
3939
maxRetriesMutate int
4040
maxRetriesMutateSleep time.Duration
4141
useServerSideTimestamps bool
42+
stmtLogger stmtLogger
4243
}
4344

4445
func (cs *cqlStore) name() string {
4546
return cs.system
4647
}
4748

48-
func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
49+
func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) {
4950
var i int
5051
for i = 0; i < cs.maxRetriesMutate; i++ {
5152
// retry with new timestamp as list modification with the same ts
5253
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
53-
err = cs.doMutate(ctx, builder, time.Now(), values...)
54+
err = cs.doMutate(ctx, stmt, time.Now())
5455
if err == nil {
55-
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
56+
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
5657
return nil
5758
}
5859
select {
@@ -67,14 +68,15 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in
6768
return err
6869
}
6970

70-
func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
71-
queryBody, _ := builder.ToCql()
72-
73-
query := cs.session.Query(queryBody, values...).WithContext(ctx)
71+
func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error {
72+
queryBody, _ := stmt.Query.ToCql()
73+
query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx)
7474
if cs.useServerSideTimestamps {
7575
query = query.DefaultTimestamp(false)
76+
cs.stmtLogger.LogStmt(stmt)
7677
} else {
7778
query = query.WithTimestamp(ts.UnixNano() / 1000)
79+
cs.stmtLogger.LogStmtWithTimeStamp(stmt, ts)
7880
}
7981

8082
if err := query.Exec(); err != nil {
@@ -90,10 +92,11 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
9092
return nil
9193
}
9294

93-
func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
94-
query, _ := builder.ToCql()
95-
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
96-
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
95+
func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]interface{}, err error) {
96+
query, _ := stmt.Query.ToCql()
97+
cs.stmtLogger.LogStmt(stmt)
98+
iter := cs.session.Query(query, stmt.Values...).WithContext(ctx).Iter()
99+
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
97100
return loadSet(iter), iter.Close()
98101
}
99102

@@ -126,8 +129,8 @@ func ignore(err error) bool {
126129
}
127130
}
128131

129-
func opType(builder qb.Builder) string {
130-
switch builder.(type) {
132+
func opType(stmt *typedef.Stmt) string {
133+
switch stmt.Query.(type) {
131134
case *qb.InsertBuilder:
132135
return "insert"
133136
case *qb.DeleteBuilder:

0 commit comments

Comments
 (0)