Skip to content

Commit ce38ad5

Browse files
authored
Merge pull request #15 from averak/feature/7-bq-schema-apply
schema applyを実装した
2 parents b360d4c + 0a58c3f commit ce38ad5

12 files changed

+494
-222
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go.work
2828

2929
### Custom ###
3030
!.gitkeep
31+
mise.toml
3132
.go-version
3233
.vscode
3334
tmp

cmd/protobq/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newCliApp() *cli.App {
4242
},
4343
},
4444
Action: func(c *cli.Context) error {
45-
err := internal.Apply(context.Background(), c.String("project-id"), c.String("dataset-id"))
45+
err := internal.ApplySchemaFromFile(context.Background(), c.String("project-id"), c.String("dataset-id"), c.String("input"))
4646
if err != nil {
4747
return err
4848
}

go.mod

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ go 1.23
44

55
require (
66
cloud.google.com/go/bigquery v1.65.0
7+
github.com/bufbuild/protocompile v0.14.1
8+
github.com/google/go-cmp v0.6.0
9+
github.com/google/martian/v3 v3.3.3
10+
github.com/huandu/go-sqlbuilder v1.33.1
711
github.com/urfave/cli/v2 v2.27.5
812
google.golang.org/api v0.214.0
913
google.golang.org/protobuf v1.36.0
@@ -27,13 +31,13 @@ require (
2731
github.com/google/uuid v1.6.0 // indirect
2832
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
2933
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
34+
github.com/huandu/xstrings v1.4.0 // indirect
3035
github.com/klauspost/compress v1.17.11 // indirect
3136
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
3237
github.com/pierrec/lz4/v4 v4.1.22 // indirect
3338
github.com/russross/blackfriday/v2 v2.1.0 // indirect
3439
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
3540
github.com/zeebo/xxh3 v1.0.2 // indirect
36-
go.opencensus.io v0.24.0 // indirect
3741
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
3842
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect
3943
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect

go.sum

+16-158
Large diffs are not rendered by default.

internal/apply.go

-17
This file was deleted.

internal/codegen.go

+21-17
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@ var (
1818
Minute: protogen.GoImportPath("time").Ident("Minute"),
1919
}
2020
protobqIdents = struct {
21-
MaterializedView protogen.GoIdent
22-
MaterializedViewOptions protogen.GoIdent
23-
InsertDTO protogen.GoIdent
24-
internal struct {
25-
NewInsertDTOImpl protogen.GoIdent
26-
BQField protogen.GoIdent
21+
internal struct {
22+
MaterializedView protogen.GoIdent
23+
MaterializedViewOptions protogen.GoIdent
24+
InsertDTO protogen.GoIdent
25+
NewInsertDTOImpl protogen.GoIdent
26+
BQField protogen.GoIdent
2727
}
2828
}{
29-
MaterializedView: protogen.GoImportPath("github.com/averak/protobq").Ident("MaterializedView"),
30-
MaterializedViewOptions: protogen.GoImportPath("github.com/averak/protobq").Ident("MaterializedViewOptions"),
31-
InsertDTO: protogen.GoImportPath("github.com/averak/protobq").Ident("InsertDTO"),
3229
internal: struct {
33-
NewInsertDTOImpl protogen.GoIdent
34-
BQField protogen.GoIdent
30+
MaterializedView protogen.GoIdent
31+
MaterializedViewOptions protogen.GoIdent
32+
InsertDTO protogen.GoIdent
33+
NewInsertDTOImpl protogen.GoIdent
34+
BQField protogen.GoIdent
3535
}{
36-
NewInsertDTOImpl: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("NewInsertDTOImpl"),
37-
BQField: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("BQField"),
36+
MaterializedView: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("MaterializedView"),
37+
MaterializedViewOptions: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("MaterializedViewOptions"),
38+
InsertDTO: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("InsertDTO"),
39+
NewInsertDTOImpl: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("NewInsertDTOImpl"),
40+
BQField: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("BQField"),
3841
},
3942
}
4043
)
@@ -74,23 +77,24 @@ func (g CodeGenerator) Gen() error {
7477
}
7578
ext, _ := proto.GetExtension(msg.Desc.Options(), protobq.E_MaterializedView).(*protobq.MaterializedView)
7679

77-
gf.P("var _ ", protobqIdents.MaterializedView, " = (*", msg.GoIdent.GoName, ")(nil)")
80+
gf.P("var _ ", protobqIdents.internal.MaterializedView, " = (*", msg.GoIdent.GoName, ")(nil)")
7881
gf.P()
7982

8083
gf.P("func (mv *", msg.GoIdent.GoName, ") Name() string {")
8184
gf.P(" return \"", msg.Desc.Name(), "\"")
8285
gf.P("}")
8386
gf.P()
8487

85-
gf.P("func (mv *", msg.GoIdent.GoName, ") Options() ", protobqIdents.MaterializedViewOptions, " {")
86-
gf.P(" return ", protobqIdents.MaterializedViewOptions, "{")
88+
gf.P("func (mv *", msg.GoIdent.GoName, ") Options() ", protobqIdents.internal.MaterializedViewOptions, " {")
89+
gf.P(" return ", protobqIdents.internal.MaterializedViewOptions, "{")
90+
gf.P(" BaseTable: \"", ext.GetBaseTable(), "\",")
8791
gf.P(" EnableRefresh: ", ext.GetEnableRefresh(), ",")
8892
gf.P(" RefreshInterval: ", ext.GetRefreshIntervalMinutes(), " * ", timeIdents.Minute, ",")
8993
gf.P(" }")
9094
gf.P("}")
9195
gf.P()
9296

93-
gf.P("func (mv *", msg.GoIdent.GoName, ") InsertDTO() ", protobqIdents.InsertDTO, " {")
97+
gf.P("func (mv *", msg.GoIdent.GoName, ") InsertDTO() ", protobqIdents.internal.InsertDTO, " {")
9498
gf.P(" res := ", protobqIdents.internal.NewInsertDTOImpl, "(\"", ext.GetBaseTable(), "\", nil)")
9599
for _, field := range msg.Fields {
96100
g.generateAddField(gf, field, nil, "res", "mv")

internal/protobuf/example/example.protobq.go

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/schema.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package internal
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"time"
7+
8+
"cloud.google.com/go/bigquery"
9+
"github.com/averak/protobq/internal/protobuf/protobq"
10+
"github.com/huandu/go-sqlbuilder"
11+
"google.golang.org/protobuf/proto"
12+
"google.golang.org/protobuf/reflect/protoreflect"
13+
)
14+
15+
type MaterializedViewOptions struct {
16+
BaseTable string
17+
EnableRefresh bool
18+
RefreshInterval time.Duration
19+
20+
// TODO: クラスタ化列、パーティション列を定義する。
21+
}
22+
23+
type MaterializedView interface {
24+
proto.Message
25+
26+
Name() string
27+
Options() MaterializedViewOptions
28+
InsertDTO() InsertDTO
29+
}
30+
31+
type InsertDTO interface {
32+
TableName() string
33+
Value() map[string]any
34+
}
35+
36+
type BQSchemaConverter struct {
37+
datasetID string
38+
raw MaterializedView
39+
}
40+
41+
func NewBQSchemaConverter(datasetID string, mv MaterializedView) *BQSchemaConverter {
42+
return &BQSchemaConverter{
43+
datasetID: datasetID,
44+
raw: mv,
45+
}
46+
}
47+
48+
func (c BQSchemaConverter) ApplyBaseTableSchema(td *bigquery.TableMetadata) error {
49+
schema := make(bigquery.Schema, 0, c.raw.ProtoReflect().Descriptor().Fields().Len())
50+
for i := range c.raw.ProtoReflect().Descriptor().Fields().Len() {
51+
field := c.raw.ProtoReflect().Descriptor().Fields().Get(i)
52+
fs, err := c.toBQFieldSchema(field)
53+
if err != nil {
54+
return err
55+
}
56+
schema = append(schema, fs)
57+
}
58+
59+
for _, existing := range td.Schema {
60+
if existing.Name == c.raw.Name() {
61+
existing.Schema = schema
62+
return nil
63+
}
64+
}
65+
td.Schema = append(td.Schema, &bigquery.FieldSchema{
66+
Name: c.raw.Name(),
67+
Type: bigquery.RecordFieldType,
68+
Required: false,
69+
Schema: schema,
70+
})
71+
return nil
72+
}
73+
74+
func (c BQSchemaConverter) MaterializedViewSchema() (*bigquery.TableMetadata, error) {
75+
res := &bigquery.TableMetadata{
76+
Name: c.raw.Name(),
77+
MaterializedView: &bigquery.MaterializedViewDefinition{
78+
Query: c.MaterializedViewDDL(),
79+
EnableRefresh: c.raw.Options().EnableRefresh,
80+
RefreshInterval: c.raw.Options().RefreshInterval,
81+
},
82+
// TODO: #7 TimePartitioning, Clustering 定義する
83+
TimePartitioning: nil,
84+
Clustering: nil,
85+
}
86+
return res, nil
87+
}
88+
89+
func (c BQSchemaConverter) MaterializedViewDDL() string {
90+
sb := sqlbuilder.NewSelectBuilder()
91+
92+
// top-level のフィールドのみ DDL に含めれば良い。
93+
cols := make([]string, 0, c.raw.ProtoReflect().Descriptor().Fields().Len())
94+
for i := range c.raw.ProtoReflect().Descriptor().Fields().Len() {
95+
field := c.raw.ProtoReflect().Descriptor().Fields().Get(i)
96+
ext, _ := proto.GetExtension(field.Options(), protobq.E_MaterializedViewField).(*protobq.MaterializedViewField)
97+
if len(ext.GetOriginPath()) > 0 {
98+
cols = append(cols, fmt.Sprintf("%s.%s", c.raw.Name(), strings.Join(ext.GetOriginPath(), ".")))
99+
} else {
100+
cols = append(cols, fmt.Sprintf("%s.%s", c.raw.Name(), field.Name()))
101+
}
102+
}
103+
res, _ := sb.Select(cols...).
104+
From(fmt.Sprintf("%s.%s", c.datasetID, c.raw.Options().BaseTable)).
105+
Where(sb.IsNotNull(c.raw.Name())).
106+
Build()
107+
return res
108+
}
109+
110+
func (c BQSchemaConverter) toBQFieldSchema(field protoreflect.FieldDescriptor) (*bigquery.FieldSchema, error) {
111+
dt, err := c.toBQDataType(field)
112+
if err != nil {
113+
return nil, err
114+
}
115+
116+
res := &bigquery.FieldSchema{
117+
Name: string(field.Name()),
118+
Type: dt,
119+
Required: field.Cardinality() == protoreflect.Required,
120+
Repeated: field.Cardinality() == protoreflect.Repeated,
121+
Schema: nil,
122+
}
123+
if dt == bigquery.RecordFieldType {
124+
for i := range field.Message().Fields().Len() {
125+
nested, err := c.toBQFieldSchema(field.Message().Fields().Get(i))
126+
if err != nil {
127+
return nil, err
128+
}
129+
res.Schema = append(res.Schema, nested)
130+
}
131+
}
132+
return res, nil
133+
}
134+
135+
func (c BQSchemaConverter) toBQDataType(field protoreflect.FieldDescriptor) (bigquery.FieldType, error) {
136+
// https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#Value
137+
var res bigquery.FieldType
138+
switch field.Kind() {
139+
case protoreflect.BoolKind:
140+
res = bigquery.BooleanFieldType
141+
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
142+
res = bigquery.IntegerFieldType
143+
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
144+
res = bigquery.IntegerFieldType
145+
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
146+
res = bigquery.IntegerFieldType
147+
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
148+
res = bigquery.IntegerFieldType
149+
case protoreflect.FloatKind:
150+
res = bigquery.FloatFieldType
151+
case protoreflect.DoubleKind:
152+
res = bigquery.FloatFieldType
153+
case protoreflect.StringKind:
154+
res = bigquery.StringFieldType
155+
case protoreflect.BytesKind:
156+
res = bigquery.BytesFieldType
157+
case protoreflect.EnumKind:
158+
res = bigquery.StringFieldType
159+
case protoreflect.MessageKind, protoreflect.GroupKind:
160+
switch field.Message().FullName() {
161+
case "google.protobuf.Timestamp":
162+
res = bigquery.TimestampFieldType
163+
default:
164+
res = bigquery.RecordFieldType
165+
}
166+
default:
167+
return "", fmt.Errorf("unsupported field type: %s", field.Kind())
168+
}
169+
return res, nil
170+
}

0 commit comments

Comments
 (0)