Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Add ILM Policy Sync
Browse files Browse the repository at this point in the history
Fixes #10
  • Loading branch information
lksnyder0 committed Jan 20, 2022
1 parent 3434a34 commit 1457725
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
16 changes: 14 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func main() {
}

func sync(c *cli.Context) error {
var src_es *elasticsearch.Client
var dst_es *elasticsearch.Client
if c.Bool("verbose") && c.Bool("quiet") {
log.Fatal("--verbose and --quiet are mutually exclusive")
} else if c.Bool("verbose") {
Expand All @@ -82,7 +84,7 @@ func sync(c *cli.Context) error {
// Get Source configuration
for i, s := range config.Clusters {
if s.Name == c.String("src") {
_, _ = getElasticClient(s.Config)
src_es, _ = getElasticClient(s.Config)
break
}
if i+1 == len(config.Clusters) {
Expand All @@ -92,14 +94,24 @@ func sync(c *cli.Context) error {
// Get destination configuration
for i, s := range config.Clusters {
if s.Name == c.String("dst") {
_, _ = getElasticClient(s.Config)
dst_es, _ = getElasticClient(s.Config)
break
}
if i+1 == len(config.Clusters) {
log.Fatalf("Unable to load configuration for environoment %s", c.String("dst"))
}

}
// Loop item list
for _, s := range config.Items {
log.Infof("Syncing %s %s", s.Type, s.Name)
switch item_type := s.Type; item_type {
case "ilm_policy":
syncILM(*src_es, *dst_es, s)
default:
log.Error("Invalid or unsupported item type %s", item_type)
}
}
return nil
}

Expand Down
60 changes: 60 additions & 0 deletions sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"bytes"
"context"
"encoding/json"
"io"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
log "github.com/sirupsen/logrus"
)

func esResCheck(res esapi.Response) {
if res.StatusCode != 200 {
var res_map map[string]interface{}
json.NewDecoder(res.Body).Decode(&res_map)
status := int(res_map["status"].(float64))
reason := res_map["error"].(map[string]interface{})["reason"]
log.Fatalf("HTTP Status = %d, Error Reason = '%s'", status, reason)
}
}

func syncILM(src elasticsearch.Client, dst elasticsearch.Client, config obj_conf) {
body := getIlmBody(src, config.Name)
putILM(dst, config.Name, body)
}

func getIlmBody(es elasticsearch.Client, id string) io.Reader {
var req_map map[string]interface{}
ilm_policy_body := make(map[string]interface{})
log.Debugf("Getting ILM policy %s", id)
req := esapi.ILMGetLifecycleRequest{
Policy: id,
}
res, err := req.Do(context.Background(), es.Transport)
handleErr(err)
defer res.Body.Close()
json.NewDecoder(res.Body).Decode(&req_map)
// Need to clear out the extra fields
ilm_policy_body["policy"] = req_map[id].(map[string]interface{})["policy"].(map[string]interface{})
// Then turn it backk into a string
body_bytes, err := json.Marshal(ilm_policy_body)
handleErr(err)
return bytes.NewReader(body_bytes)
}

func putILM(es elasticsearch.Client, id string, body io.Reader) {
// var req_bytes []byte
log.Debugf("Putting ILM policy %s", id)
req := esapi.ILMPutLifecycleRequest{
Body: body,
Policy: id,
}
res, err := req.Do(context.Background(), es.Transport)
handleErr(err)
esResCheck(*res)
defer res.Body.Close()
log.Debug("Success")
}

0 comments on commit 1457725

Please sign in to comment.