Skip to content

Commit 83c8ee6

Browse files
authored
Merge pull request nyaruka#55 from nyaruka/cleanup
Change index setting to contacts_index and rename some stuff for clarity
2 parents ca472f4 + 8914509 commit 83c8ee6

File tree

8 files changed

+64
-60
lines changed

8 files changed

+64
-60
lines changed

cmd/rp-indexer/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ func main() {
5555
}
5656

5757
idxrs := []indexers.Indexer{
58-
indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500),
58+
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
5959
}
6060

6161
if cfg.Rebuild {
6262
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
6363
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
6464
idxr := idxrs[0]
65-
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
65+
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
6666
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
6767
}
6868
} else {

config.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,35 @@ package indexer
33
import "os"
44

55
type Config struct {
6-
ElasticURL string `help:"the url for our elastic search instance"`
7-
DB string `help:"the connection string for our database"`
8-
Index string `help:"the alias for our contact index"`
9-
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
10-
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
11-
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
12-
LogLevel string `help:"the log level, one of error, warn, info, debug"`
13-
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
14-
ContactsShards int `help:"The number of shards to use for the contacts index"`
15-
ContactsReplicas int `help:"The number of replicas to use for the contacts index"`
16-
6+
ElasticURL string `help:"the url for our elastic search instance"`
7+
DB string `help:"the connection string for our database"`
8+
Poll int `help:"the number of seconds to wait between checking for database updates"`
9+
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
10+
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
11+
LogLevel string `help:"the log level, one of error, warn, info, debug"`
12+
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
1713
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
1814
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
1915
InstanceName string `help:"the unique name of this instance used for analytics"`
16+
17+
ContactsIndex string `help:"the alias to use for the contact index"`
18+
ContactsShards int `help:"the number of shards to use for the contacts index"`
19+
ContactsReplicas int `help:"the number of replicas to use for the contacts index"`
2020
}
2121

2222
func NewDefaultConfig() *Config {
2323
hostname, _ := os.Hostname()
2424

2525
return &Config{
26-
ElasticURL: "http://localhost:9200",
27-
DB: "postgres://localhost/temba?sslmode=disable",
28-
Index: "contacts",
29-
Poll: 5,
30-
Rebuild: false,
31-
Cleanup: false,
32-
LogLevel: "info",
33-
InstanceName: hostname,
26+
ElasticURL: "http://localhost:9200",
27+
DB: "postgres://localhost/temba?sslmode=disable",
28+
Poll: 5,
29+
Rebuild: false,
30+
Cleanup: false,
31+
LogLevel: "info",
32+
InstanceName: hostname,
33+
34+
ContactsIndex: "contacts",
3435
ContactsShards: 2,
3536
ContactsReplicas: 1,
3637
}

daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
6666
case <-d.quit:
6767
return
6868
case <-time.After(d.poll):
69-
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
69+
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
7070
if err != nil {
7171
log.WithError(err).Error("error during indexing")
7272
}

indexers/base.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ type Stats struct {
2929
// Indexer is base interface for indexers
3030
type Indexer interface {
3131
Name() string
32-
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
32+
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
3333
Stats() Stats
3434
}
3535

36-
type ElasticSettings struct {
36+
// IndexDefinition is what we pass to elastic to create an index,
37+
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
38+
type IndexDefinition struct {
3739
Settings struct {
3840
Index struct {
3941
NumberOfShards int `json:"number_of_shards"`
@@ -45,15 +47,25 @@ type ElasticSettings struct {
4547
Mappings json.RawMessage `json:"mappings"`
4648
}
4749

50+
func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition {
51+
d := &IndexDefinition{}
52+
jsonx.MustUnmarshal(contactsIndexDef, d)
53+
54+
d.Settings.Index.NumberOfShards = shards
55+
d.Settings.Index.NumberOfReplicas = replicas
56+
return d
57+
}
58+
4859
type baseIndexer struct {
4960
elasticURL string
5061
name string // e.g. contacts, used as the alias
62+
definition *IndexDefinition
5163

5264
stats Stats
5365
}
5466

55-
func newBaseIndexer(elasticURL, name string) baseIndexer {
56-
return baseIndexer{elasticURL: elasticURL, name: name}
67+
func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer {
68+
return baseIndexer{elasticURL: elasticURL, name: name, definition: def}
5769
}
5870

5971
func (i *baseIndexer) Name() string {
@@ -111,7 +123,7 @@ func (i *baseIndexer) FindIndexes() []string {
111123
// that index to `contacts`.
112124
//
113125
// If the day-specific name already exists, we append a .1 or .2 to the name.
114-
func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) {
126+
func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
115127
// create our day-specific name
116128
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
117129
idx := 0
@@ -133,11 +145,9 @@ func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, err
133145
}
134146

135147
// create the new index
136-
settings, err := json.Marshal(indexSettings)
137-
if err != nil {
138-
return "", err
139-
}
140-
_, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
148+
settings := jsonx.MustMarshal(def)
149+
150+
_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
141151
if err != nil {
142152
return "", err
143153
}

indexers/base_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package indexers_test
33
import (
44
"context"
55
"database/sql"
6-
"io/ioutil"
76
"log"
87
"os"
98
"sort"
@@ -22,7 +21,7 @@ const elasticURL = "http://localhost:9200"
2221
const aliasName = "indexer_test"
2322

2423
func setup(t *testing.T) (*sql.DB, *elastic.Client) {
25-
testDB, err := ioutil.ReadFile("../testdb.sql")
24+
testDB, err := os.ReadFile("../testdb.sql")
2625
require.NoError(t, err)
2726

2827
db, err := sql.Open("postgres", "postgres://nyaruka:nyaruka@localhost:5432/elastic_test?sslmode=disable")

indexers/contacts.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,15 @@ import (
44
"bytes"
55
"database/sql"
66
_ "embed"
7-
"encoding/json"
87
"fmt"
8+
"time"
9+
910
"github.com/pkg/errors"
1011
"github.com/sirupsen/logrus"
11-
"time"
1212
)
1313

14-
//go:embed contacts.settings.json
15-
var contactsSettingsFile []byte
16-
17-
var contactsSettings ElasticSettings
14+
//go:embed contacts.index.json
15+
var contactsIndexDef []byte
1816

1917
// ContactIndexer is an indexer for contacts
2018
type ContactIndexer struct {
@@ -24,15 +22,17 @@ type ContactIndexer struct {
2422
}
2523

2624
// NewContactIndexer creates a new contact indexer
27-
func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
25+
func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer {
26+
def := newIndexDefinition(contactsIndexDef, shards, replicas)
27+
2828
return &ContactIndexer{
29-
baseIndexer: newBaseIndexer(elasticURL, name),
29+
baseIndexer: newBaseIndexer(elasticURL, name, def),
3030
batchSize: batchSize,
3131
}
3232
}
3333

3434
// Index indexes modified contacts and returns the name of the concrete index
35-
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) {
35+
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
3636
var err error
3737

3838
// find our physical index
@@ -48,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, re
4848

4949
// doesn't exist or we are rebuilding, create it
5050
if physicalIndex == "" || rebuild {
51-
err = json.Unmarshal(contactsSettingsFile, &contactsSettings)
52-
if err != nil {
53-
return "", errors.Wrap(err, "error unmarshalling embeded contacts.settings.json file")
54-
}
55-
contactsSettings.Settings.Index.NumberOfShards = shards
56-
contactsSettings.Settings.Index.NumberOfReplicas = replicas
57-
physicalIndex, err = i.createNewIndex(contactsSettings)
51+
physicalIndex, err = i.createNewIndex(i.definition)
5852
if err != nil {
5953
return "", errors.Wrap(err, "error creating new index")
6054
}

indexers/contacts.settings.json renamed to indexers/contacts.index.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"settings": {
33
"index": {
4-
"number_of_shards": 2,
5-
"number_of_replicas": 1,
4+
"number_of_shards": -1,
5+
"number_of_replicas": -1,
66
"routing_partition_size": 1
77
},
88
"analysis": {

indexers/contacts_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,12 @@ var contactQueryTests = []struct {
186186
func TestContacts(t *testing.T) {
187187
db, es := setup(t)
188188

189-
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
189+
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
190190
assert.Equal(t, "indexer_test", ix1.Name())
191191

192192
expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))
193193

194-
indexName, err := ix1.Index(db, false, false, 2, 1)
194+
indexName, err := ix1.Index(db, false, false)
195195
assert.NoError(t, err)
196196
assert.Equal(t, expectedIndexName, indexName)
197197

@@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
217217
require.NoError(t, err)
218218

219219
// and index again...
220-
indexName, err = ix1.Index(db, false, false, 2, 1)
220+
indexName, err = ix1.Index(db, false, false)
221221
assert.NoError(t, err)
222222
assert.Equal(t, expectedIndexName, indexName) // same index used
223223
assertIndexerStats(t, ix1, 10, 1)
@@ -238,9 +238,9 @@ func TestContacts(t *testing.T) {
238238
require.NoError(t, err)
239239

240240
// and simulate another indexer doing a parallel rebuild
241-
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
241+
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
242242

243-
indexName2, err := ix2.Index(db, true, false, 2, 1)
243+
indexName2, err := ix2.Index(db, true, false)
244244
assert.NoError(t, err)
245245
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
246246
assertIndexerStats(t, ix2, 8, 0)
@@ -254,8 +254,8 @@ func TestContacts(t *testing.T) {
254254
assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2})
255255

256256
// simulate another indexer doing a parallel rebuild with cleanup
257-
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
258-
indexName3, err := ix3.Index(db, true, true, 2, 1)
257+
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
258+
indexName3, err := ix3.Index(db, true, true)
259259
assert.NoError(t, err)
260260
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
261261
assertIndexerStats(t, ix3, 8, 0)
@@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
264264
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})
265265

266266
// check that the original indexer now indexes against the new index
267-
indexName, err = ix1.Index(db, false, false, 2, 1)
267+
indexName, err = ix1.Index(db, false, false)
268268
assert.NoError(t, err)
269269
assert.Equal(t, expectedIndexName+"_2", indexName)
270270
}

0 commit comments

Comments
 (0)