Skip to content

Commit b6622d4

Browse files
committed
Added scheduled sync implementation
1 parent e192148 commit b6622d4

File tree

11 files changed

+531
-242
lines changed

11 files changed

+531
-242
lines changed

cmd/clace/apply_cmd.go

Lines changed: 68 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -77,81 +77,86 @@ Examples:
7777
return err
7878
}
7979

80-
if len(applyResponse.CreateResults) > 0 {
81-
fmt.Fprintf(cCtx.App.Writer, "Created apps:\n")
82-
for i, createResult := range applyResponse.CreateResults {
83-
if i > 0 {
84-
fmt.Fprintf(cCtx.App.Writer, "\n")
85-
}
86-
printCreateResult(cCtx, createResult)
87-
}
80+
printApplyResponse(cCtx, &applyResponse)
81+
if applyResponse.DryRun {
82+
fmt.Print(DRY_RUN_MESSAGE)
8883
}
8984

90-
if len(applyResponse.UpdateResults) > 0 {
91-
fmt.Fprintf(cCtx.App.Writer, "Updated apps: ")
92-
for i, updateResult := range applyResponse.UpdateResults {
93-
if i > 0 {
94-
fmt.Fprintf(cCtx.App.Writer, ", ")
95-
}
96-
fmt.Fprintf(cCtx.App.Writer, "%s", updateResult)
97-
}
98-
fmt.Fprintln(cCtx.App.Writer)
99-
}
85+
return nil
86+
},
87+
}
88+
}
10089

101-
if len(applyResponse.ReloadResults) > 0 {
102-
fmt.Fprintf(cCtx.App.Writer, "Reloaded apps: ")
103-
for i, reloadResult := range applyResponse.ReloadResults {
104-
if i > 0 {
105-
fmt.Fprintf(cCtx.App.Writer, ", ")
106-
}
107-
fmt.Fprintf(cCtx.App.Writer, "%s", reloadResult)
108-
}
109-
fmt.Fprintln(cCtx.App.Writer)
90+
func printApplyResponse(cCtx *cli.Context, applyResponse *types.AppApplyResponse) error {
91+
if len(applyResponse.CreateResults) > 0 {
92+
fmt.Fprintf(cCtx.App.Writer, "Created apps:\n")
93+
for i, createResult := range applyResponse.CreateResults {
94+
if i > 0 {
95+
fmt.Fprintf(cCtx.App.Writer, "\n")
11096
}
97+
printCreateResult(cCtx, createResult)
98+
}
99+
}
111100

112-
if len(applyResponse.SkippedResults) > 0 {
113-
fmt.Fprintf(cCtx.App.Writer, "Skipped apps: ")
114-
for i, skipResult := range applyResponse.SkippedResults {
115-
if i > 0 {
116-
fmt.Fprintf(cCtx.App.Writer, ", ")
117-
}
118-
fmt.Fprintf(cCtx.App.Writer, "%s", skipResult)
119-
}
120-
fmt.Fprintln(cCtx.App.Writer)
101+
if len(applyResponse.UpdateResults) > 0 {
102+
fmt.Fprintf(cCtx.App.Writer, "Updated apps: ")
103+
for i, updateResult := range applyResponse.UpdateResults {
104+
if i > 0 {
105+
fmt.Fprintf(cCtx.App.Writer, ", ")
121106
}
107+
fmt.Fprintf(cCtx.App.Writer, "%s", updateResult)
108+
}
109+
fmt.Fprintln(cCtx.App.Writer)
110+
}
122111

123-
if len(applyResponse.ApproveResults) > 0 {
124-
fmt.Fprintf(cCtx.App.Writer, "Approved apps:\n")
125-
for _, approveResult := range applyResponse.ApproveResults {
126-
if !approveResult.NeedsApproval {
127-
// Server does not return these for reload to reduce the noise
128-
fmt.Printf("No approval required. %s - %s\n", approveResult.AppPathDomain, approveResult.Id)
129-
} else {
130-
fmt.Printf("App permissions have been approved %s - %s\n", approveResult.AppPathDomain, approveResult.Id)
131-
printApproveResult(approveResult)
132-
}
133-
}
112+
if len(applyResponse.ReloadResults) > 0 {
113+
fmt.Fprintf(cCtx.App.Writer, "Reloaded apps: ")
114+
for i, reloadResult := range applyResponse.ReloadResults {
115+
if i > 0 {
116+
fmt.Fprintf(cCtx.App.Writer, ", ")
134117
}
118+
fmt.Fprintf(cCtx.App.Writer, "%s", reloadResult)
119+
}
120+
fmt.Fprintln(cCtx.App.Writer)
121+
}
135122

136-
if len(applyResponse.PromoteResults) > 0 {
137-
fmt.Fprintf(cCtx.App.Writer, "Promoted apps: ")
138-
for i, promoteResult := range applyResponse.PromoteResults {
139-
if i > 0 {
140-
fmt.Fprintf(cCtx.App.Writer, ", ")
141-
}
142-
fmt.Fprintf(cCtx.App.Writer, "%s", promoteResult)
143-
}
144-
fmt.Fprintln(cCtx.App.Writer)
123+
if len(applyResponse.SkippedResults) > 0 {
124+
fmt.Fprintf(cCtx.App.Writer, "Skipped apps: ")
125+
for i, skipResult := range applyResponse.SkippedResults {
126+
if i > 0 {
127+
fmt.Fprintf(cCtx.App.Writer, ", ")
145128
}
129+
fmt.Fprintf(cCtx.App.Writer, "%s", skipResult)
130+
}
131+
fmt.Fprintln(cCtx.App.Writer)
132+
}
146133

147-
fmt.Fprintf(cCtx.App.Writer, "%d app(s) created, %d app(s) updated, %d app(s) reloaded, %d app(s) skipped, %d app(s) approved, %d app(s) promoted.\n",
148-
len(applyResponse.CreateResults), len(applyResponse.UpdateResults), len(applyResponse.ReloadResults), len(applyResponse.SkippedResults), len(applyResponse.ApproveResults), len(applyResponse.PromoteResults))
149-
150-
if applyResponse.DryRun {
151-
fmt.Print(DRY_RUN_MESSAGE)
134+
if len(applyResponse.ApproveResults) > 0 {
135+
fmt.Fprintf(cCtx.App.Writer, "Approved apps:\n")
136+
for _, approveResult := range applyResponse.ApproveResults {
137+
if !approveResult.NeedsApproval {
138+
// Server does not return these for reload to reduce the noise
139+
fmt.Printf("No approval required. %s - %s\n", approveResult.AppPathDomain, approveResult.Id)
140+
} else {
141+
fmt.Printf("App permissions have been approved %s - %s\n", approveResult.AppPathDomain, approveResult.Id)
142+
printApproveResult(approveResult)
152143
}
144+
}
145+
}
153146

154-
return nil
155-
},
147+
if len(applyResponse.PromoteResults) > 0 {
148+
fmt.Fprintf(cCtx.App.Writer, "Promoted apps: ")
149+
for i, promoteResult := range applyResponse.PromoteResults {
150+
if i > 0 {
151+
fmt.Fprintf(cCtx.App.Writer, ", ")
152+
}
153+
fmt.Fprintf(cCtx.App.Writer, "%s", promoteResult)
154+
}
155+
fmt.Fprintln(cCtx.App.Writer)
156156
}
157+
158+
fmt.Fprintf(cCtx.App.Writer, "%d app(s) created, %d app(s) updated, %d app(s) reloaded, %d app(s) skipped, %d app(s) approved, %d app(s) promoted.\n",
159+
len(applyResponse.CreateResults), len(applyResponse.UpdateResults), len(applyResponse.ReloadResults), len(applyResponse.SkippedResults), len(applyResponse.ApproveResults), len(applyResponse.PromoteResults))
160+
161+
return nil
157162
}

cmd/clace/sync_cmds.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,29 @@ func initSyncCommand(commonFlags []cli.Flag, clientConfig *types.ClientConfig) *
2121
Name: "sync",
2222
Usage: "Manage sync operations, scheduled and webhook",
2323
Subcommands: []*cli.Command{
24-
syncCreateCommand(commonFlags, clientConfig),
24+
syncScheduleCommand(commonFlags, clientConfig),
2525
syncListCommand(commonFlags, clientConfig),
2626
syncDeleteCommand(commonFlags, clientConfig),
2727
},
2828
}
2929
}
3030

31-
func syncCreateCommand(commonFlags []cli.Flag, clientConfig *types.ClientConfig) *cli.Command {
31+
func syncScheduleCommand(commonFlags []cli.Flag, clientConfig *types.ClientConfig) *cli.Command {
3232
flags := make([]cli.Flag, 0, len(commonFlags)+2)
3333
flags = append(flags, commonFlags...)
3434
flags = append(flags, newStringFlag("branch", "b", "The branch to checkout if using git source", "main"))
3535
flags = append(flags, newStringFlag("git-auth", "g", "The name of the git_auth entry in server config to use", ""))
3636
flags = append(flags, newBoolFlag("approve", "a", "Approve the app permissions", false))
3737
flags = append(flags, newStringFlag("reload", "r", "Which apps to reload: none, updated, matched", ""))
3838
flags = append(flags, newBoolFlag("promote", "p", "Promote changes from stage to prod", false))
39-
flags = append(flags, newIntFlag("schedule", "s", "Schedule sync for every N minutes", 0))
39+
flags = append(flags, newIntFlag("minutes", "s", "Schedule sync for every N minutes", 0))
4040
flags = append(flags, newBoolFlag("clobber", "", "Force update app config, overwriting non-declarative changes", false))
41-
flags = append(flags, newBoolFlag("force-reload", "f", "Force reload even if there is no new commit", false))
41+
flags = append(flags, newBoolFlag("force-reload", "f", "Force reload even if there are no new commits", false))
4242
flags = append(flags, dryRunFlag())
4343

4444
return &cli.Command{
45-
Name: "create",
46-
Usage: "Create sync job for updating app config",
45+
Name: "schedule",
46+
Usage: "Create scheduled sync job for updating app config",
4747
Flags: flags,
4848
Before: altsrc.InitInputSourceWithContext(flags, altsrc.NewTomlSourceFromFlagFunc(configFileFlagName)),
4949
ArgsUsage: "<filePath>",
@@ -52,23 +52,22 @@ func syncCreateCommand(commonFlags []cli.Flag, clientConfig *types.ClientConfig)
5252
<filePath> is the path to the apply file containing the app configuration.
5353
5454
Examples:
55-
Create sync entry, reloading all apps: clace sync ./app.ace
56-
Create sync entry, reloading updated apps: clace sync --reload=updated github.com/claceio/apps/apps.ace
57-
Create sync entry: clace sync --promote --approve github.com/claceio/apps/apps.ace
58-
Create sync entry, overwriting changes: clace sync --promote --clobber github.com/claceio/apps/apps.ace
55+
Create scheduled sync, reloading apps with code changes: clace sync schedule ./app.ace
56+
Create scheduled sync, reloading only apps with a config change: clace sync schedule --reload=updated github.com/claceio/apps/apps.ace
57+
Create scheduled sync, promoting changes: clace sync schedule --promote --approve github.com/claceio/apps/apps.ace
58+
Create scheduled sync, overwriting changes: clace sync schedule --promote --clobber github.com/claceio/apps/apps.ace
5959
`,
60-
6160
Action: func(cCtx *cli.Context) error {
6261
if cCtx.NArg() != 1 {
6362
return fmt.Errorf("expected one arg : <filePath>")
6463
}
6564

6665
client := system.NewHttpClient(clientConfig.ServerUri, clientConfig.AdminUser, clientConfig.Client.AdminPassword, clientConfig.Client.SkipCertCheck)
6766
reloadMode := types.AppReloadOption(cmp.Or(cCtx.String("reload"), string(types.AppReloadOptionMatched)))
68-
schedule := cCtx.Int("schedule")
6967
values := url.Values{}
7068
values.Add("path", cCtx.Args().Get(0))
7169
values.Add(DRY_RUN_ARG, strconv.FormatBool(cCtx.Bool(DRY_RUN_FLAG)))
70+
values.Add("scheduled", "true")
7271

7372
sync := types.SyncMetadata{
7473
GitBranch: cCtx.String("branch"),
@@ -78,7 +77,7 @@ Examples:
7877
Reload: string(reloadMode),
7978
Clobber: cCtx.Bool("clobber"),
8079
ForceReload: cCtx.Bool("force-reload"),
81-
ScheduleFrequency: schedule,
80+
ScheduleFrequency: cCtx.Int("minutes"),
8281
}
8382

8483
var syncResponse types.SyncCreateResponse
@@ -87,8 +86,9 @@ Examples:
8786
return err
8887
}
8988

90-
fmt.Printf("Sync job created with Id: %s\n", syncResponse.Id)
89+
printApplyResponse(cCtx, &syncResponse.SyncJobStatus.ApplyResponse)
9190

91+
fmt.Printf("\nSync job created with Id: %s\n", syncResponse.Id)
9292
if syncResponse.DryRun {
9393
fmt.Print(DRY_RUN_MESSAGE)
9494
}
@@ -194,30 +194,30 @@ func printSyncList(cCtx *cli.Context, sync []*types.SyncEntry, format string) {
194194
enc.Encode(s)
195195
}
196196
case FORMAT_BASIC:
197-
formatStr := "%-35s %-10s %-40s %-s\n"
198-
fmt.Fprintf(cCtx.App.Writer, formatStr, "Id", "Trigger", "Path", "Url")
197+
formatStr := "%-35s %-12s %-s\n"
198+
fmt.Fprintf(cCtx.App.Writer, formatStr, "Id", "SyncType", "Path")
199199

200200
for _, s := range sync {
201-
fmt.Fprintf(cCtx.App.Writer, formatStr, s.Id, getTriggerType(s), s.Path, s.Metadata.WebhookUrl)
201+
fmt.Fprintf(cCtx.App.Writer, formatStr, s.Id, getSyncType(s), s.Path)
202202
}
203203
case FORMAT_TABLE:
204-
formatStrHead := "%-35s %-10s %-8s %-8s %-7s %-6s %-10s %-15s %-40s %-s\n"
205-
formatStrData := "%-35s %-10s %-8s %-8t %-7t %-6t %-10s %-15s %-40s %-s\n"
206-
fmt.Fprintf(cCtx.App.Writer, formatStrHead, "Id", "Trigger", "Reload", "Promote", "Approve", "Clobber", "GitAuth", "Branch", "Path", "Url")
204+
formatStrHead := "%-35s %-12s %-8s %-8s %-7s %-7s %-10s %-15s %-s\n"
205+
formatStrData := "%-35s %-12s %-8s %-8t %-7t %-7t %-10s %-15s %-s\n"
206+
fmt.Fprintf(cCtx.App.Writer, formatStrHead, "Id", "SyncType", "Reload", "Promote", "Approve", "Clobber", "GitAuth", "Branch", "Path")
207207

208208
for _, s := range sync {
209-
fmt.Fprintf(cCtx.App.Writer, formatStrData, s.Id, getTriggerType(s), s.Metadata.Reload, s.Metadata.Promote, s.Metadata.Approve, s.Metadata.Clobber, s.Metadata.GitAuth, s.Metadata.GitBranch, s.Path, s.Metadata.WebhookUrl)
209+
fmt.Fprintf(cCtx.App.Writer, formatStrData, s.Id, getSyncType(s), s.Metadata.Reload, s.Metadata.Promote, s.Metadata.Approve, s.Metadata.Clobber, s.Metadata.GitAuth, s.Metadata.GitBranch, s.Path)
210210
}
211211
case FORMAT_CSV:
212212
for _, s := range sync {
213-
fmt.Fprintf(cCtx.App.Writer, "%s,%s,%s,%t,%t,%t,%s,%s,%s,%s\n", s.Id, getTriggerType(s), s.Metadata.Reload, s.Metadata.Promote, s.Metadata.Approve, s.Metadata.Clobber, s.Metadata.GitAuth, s.Metadata.GitBranch, s.Path, s.Metadata.WebhookUrl)
213+
fmt.Fprintf(cCtx.App.Writer, "%s,%s,%s,%t,%t,%t,%s,%s,%s,%s\n", s.Id, getSyncType(s), s.Metadata.Reload, s.Metadata.Promote, s.Metadata.Approve, s.Metadata.Clobber, s.Metadata.GitAuth, s.Metadata.GitBranch, s.Path, s.Metadata.WebhookUrl)
214214
}
215215
default:
216216
panic(fmt.Errorf("unknown format %s", format))
217217
}
218218
}
219219

220-
func getTriggerType(sync *types.SyncEntry) string {
220+
func getSyncType(sync *types.SyncEntry) string {
221221
if sync.Metadata.ScheduleFrequency > 0 {
222222
return fmt.Sprintf("%d (mins)", sync.Metadata.ScheduleFrequency)
223223
}

internal/metadata/metadata.go

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
_ "modernc.org/sqlite"
1818
)
1919

20-
const CURRENT_DB_VERSION = 4
20+
const CURRENT_DB_VERSION = 5
2121

2222
// Metadata is the metadata persistence layer
2323
type Metadata struct {
@@ -129,6 +129,17 @@ func (m *Metadata) VersionUpgrade(config *types.ServerConfig) error {
129129
}
130130
}
131131

132+
if version < 5 {
133+
m.Info().Msg("Upgrading to version 5")
134+
if _, err := tx.ExecContext(ctx, `alter table sync add column status json`); err != nil {
135+
return err
136+
}
137+
138+
if _, err := tx.ExecContext(ctx, `update version set version=5, last_upgraded=datetime('now')`); err != nil {
139+
return err
140+
}
141+
}
142+
132143
if err := tx.Commit(); err != nil {
133144
return err
134145
}
@@ -407,8 +418,13 @@ func (m *Metadata) CreateSync(ctx context.Context, tx types.Transaction, sync *t
407418
return fmt.Errorf("error marshalling metadata: %w", err)
408419
}
409420

410-
_, err = tx.ExecContext(ctx, `INSERT into sync(id, path, is_scheduled, user_id, create_time, metadata) values(?, ?, ?, ?, datetime('now'), ?)`,
411-
sync.Id, sync.Path, sync.IsScheduled, sync.UserID, metadataJson)
421+
statusJson, err := json.Marshal(sync.Status)
422+
if err != nil {
423+
return fmt.Errorf("error marshalling status: %w", err)
424+
}
425+
426+
_, err = tx.ExecContext(ctx, `INSERT into sync(id, path, is_scheduled, user_id, create_time, metadata, status) values(?, ?, ?, ?, datetime('now'), ?, ?)`,
427+
sync.Id, sync.Path, sync.IsScheduled, sync.UserID, metadataJson, statusJson)
412428
if err != nil {
413429
return fmt.Errorf("error inserting sync entry: %w", err)
414430
}
@@ -432,7 +448,7 @@ func (m *Metadata) DeleteSync(ctx context.Context, tx types.Transaction, id stri
432448

433449
// GetSyncEntries gets all the sync entries for the given webhook type
434450
func (m *Metadata) GetSyncEntries(ctx context.Context, tx types.Transaction) ([]*types.SyncEntry, error) {
435-
stmt, err := tx.PrepareContext(ctx, `select id, path, is_scheduled, user_id, create_time, metadata from sync`)
451+
stmt, err := tx.PrepareContext(ctx, `select id, path, is_scheduled, user_id, create_time, metadata, status from sync`)
436452
if err != nil {
437453
return nil, fmt.Errorf("error preparing statement: %w", err)
438454
}
@@ -445,7 +461,8 @@ func (m *Metadata) GetSyncEntries(ctx context.Context, tx types.Transaction) ([]
445461
for rows.Next() {
446462
var sync types.SyncEntry
447463
var metadata sql.NullString
448-
err = rows.Scan(&sync.Id, &sync.Path, &sync.IsScheduled, &sync.UserID, &sync.CreateTime, &metadata)
464+
var status sql.NullString
465+
err = rows.Scan(&sync.Id, &sync.Path, &sync.IsScheduled, &sync.UserID, &sync.CreateTime, &metadata, &status)
449466
if err != nil {
450467
if err == sql.ErrNoRows {
451468
return syncEntries, nil // No entries found, return empty slice
@@ -460,6 +477,13 @@ func (m *Metadata) GetSyncEntries(ctx context.Context, tx types.Transaction) ([]
460477
}
461478
}
462479

480+
if status.Valid && status.String != "" {
481+
err = json.Unmarshal([]byte(status.String), &sync.Status)
482+
if err != nil {
483+
return nil, fmt.Errorf("error unmarshalling status: %w", err)
484+
}
485+
}
486+
463487
syncEntries = append(syncEntries, &sync)
464488
}
465489
if closeErr := rows.Close(); closeErr != nil {
@@ -469,10 +493,10 @@ func (m *Metadata) GetSyncEntries(ctx context.Context, tx types.Transaction) ([]
469493
return syncEntries, nil
470494
}
471495
func (m *Metadata) GetSyncEntry(ctx context.Context, tx types.Transaction, id string) (*types.SyncEntry, error) {
472-
row := m.db.QueryRow("select id, path, is_scheduled, user_id, create_time, metadata from sync where id = ?", id)
496+
row := m.db.QueryRow("select id, path, is_scheduled, user_id, create_time, metadata, status from sync where id = ?", id)
473497
var sync types.SyncEntry
474-
var metadata sql.NullString
475-
err := row.Scan(&sync.Id, &sync.Path, &sync.IsScheduled, &sync.UserID, &sync.CreateTime, &metadata)
498+
var metadata, status sql.NullString
499+
err := row.Scan(&sync.Id, &sync.Path, &sync.IsScheduled, &sync.UserID, &sync.CreateTime, &metadata, &status)
476500
if err != nil {
477501
if err == sql.ErrNoRows {
478502
return nil, errors.New("sync entry not found with id: " + id)
@@ -486,9 +510,29 @@ func (m *Metadata) GetSyncEntry(ctx context.Context, tx types.Transaction, id st
486510
return nil, fmt.Errorf("error unmarshalling metadata: %w", err)
487511
}
488512
}
513+
if status.Valid && status.String != "" {
514+
err = json.Unmarshal([]byte(status.String), &sync.Status)
515+
if err != nil {
516+
return nil, fmt.Errorf("error unmarshalling status: %w", err)
517+
}
518+
}
489519
return &sync, nil
490520
}
491521

522+
func (m *Metadata) UpdateSyncStatus(ctx context.Context, tx types.Transaction, id string, status *types.SyncJobStatus) error {
523+
statusJson, err := json.Marshal(status)
524+
if err != nil {
525+
return fmt.Errorf("error marshalling status: %w", err)
526+
}
527+
528+
_, err = tx.ExecContext(ctx, `UPDATE sync set status = ? where id = ?`, string(statusJson), id)
529+
if err != nil {
530+
return fmt.Errorf("error updating app status: %w", err)
531+
}
532+
533+
return nil
534+
}
535+
492536
// BeginTransaction starts a new Transaction
493537
func (m *Metadata) BeginTransaction(ctx context.Context) (types.Transaction, error) {
494538
tx, err := m.db.BeginTx(ctx, nil)

0 commit comments

Comments
 (0)