Skip to content

Commit 30860b8

Browse files
authored
Merge pull request nyaruka#54 from tybritten/master
Allow configurable Shards & Replicas on Contacts Index
2 parents 73acfde + 021e17d commit 30860b8

File tree

6 files changed

+57
-30
lines changed

6 files changed

+57
-30
lines changed

cmd/rp-indexer/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func main() {
6262
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
6363
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
6464
idxr := idxrs[0]
65-
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
65+
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
6666
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
6767
}
6868
} else {

config.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package indexer
33
import "os"
44

55
type Config struct {
6-
ElasticURL string `help:"the url for our elastic search instance"`
7-
DB string `help:"the connection string for our database"`
8-
Index string `help:"the alias for our contact index"`
9-
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
10-
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
11-
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
12-
LogLevel string `help:"the log level, one of error, warn, info, debug"`
13-
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
6+
ElasticURL string `help:"the url for our elastic search instance"`
7+
DB string `help:"the connection string for our database"`
8+
Index string `help:"the alias for our contact index"`
9+
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
10+
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
11+
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
12+
LogLevel string `help:"the log level, one of error, warn, info, debug"`
13+
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
14+
ContactsShards int `help:"The number of shards to use for the contacts index"`
15+
ContactsReplicas int `help:"The number of replicas to use for the contacts index"`
1416

1517
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
1618
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
@@ -21,13 +23,15 @@ func NewDefaultConfig() *Config {
2123
hostname, _ := os.Hostname()
2224

2325
return &Config{
24-
ElasticURL: "http://localhost:9200",
25-
DB: "postgres://localhost/temba?sslmode=disable",
26-
Index: "contacts",
27-
Poll: 5,
28-
Rebuild: false,
29-
Cleanup: false,
30-
LogLevel: "info",
31-
InstanceName: hostname,
26+
ElasticURL: "http://localhost:9200",
27+
DB: "postgres://localhost/temba?sslmode=disable",
28+
Index: "contacts",
29+
Poll: 5,
30+
Rebuild: false,
31+
Cleanup: false,
32+
LogLevel: "info",
33+
InstanceName: hostname,
34+
ContactsShards: 2,
35+
ContactsReplicas: 1,
3236
}
3337
}

daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
6666
case <-d.quit:
6767
return
6868
case <-time.After(d.poll):
69-
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
69+
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
7070
if err != nil {
7171
log.WithError(err).Error("error during indexing")
7272
}

indexers/base.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,22 @@ type Stats struct {
2929
// Indexer is base interface for indexers
3030
type Indexer interface {
3131
Name() string
32-
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
32+
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
3333
Stats() Stats
3434
}
3535

36+
type ElasticSettings struct {
37+
Settings struct {
38+
Index struct {
39+
NumberOfShards int `json:"number_of_shards"`
40+
NumberOfReplicas int `json:"number_of_replicas"`
41+
RoutingPartitionSize int `json:"routing_partition_size"`
42+
} `json:"index"`
43+
Analysis json.RawMessage `json:"analysis"`
44+
} `json:"settings"`
45+
Mappings json.RawMessage `json:"mappings"`
46+
}
47+
3648
type baseIndexer struct {
3749
elasticURL string
3850
name string // e.g. contacts, used as the alias
@@ -99,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string {
99111
// that index to `contacts`.
100112
//
101113
// If the day-specific name already exists, we append a .1 or .2 to the name.
102-
func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) {
114+
func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) {
103115
// create our day-specific name
104116
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
105117
idx := 0
@@ -121,7 +133,11 @@ func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) {
121133
}
122134

123135
// create the new index
124-
_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
136+
settings, err := json.Marshal(indexSettings)
137+
if err != nil {
138+
return "", err
139+
}
140+
_, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
125141
if err != nil {
126142
return "", err
127143
}

indexers/contacts.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66
_ "embed"
77
"encoding/json"
88
"fmt"
9-
"time"
10-
119
"github.com/pkg/errors"
1210
"github.com/sirupsen/logrus"
11+
"time"
1312
)
1413

1514
//go:embed contacts.settings.json
16-
var contactsSettings json.RawMessage
15+
var contactsSettingsFile []byte
16+
17+
var contactsSettings ElasticSettings
1718

1819
// ContactIndexer is an indexer for contacts
1920
type ContactIndexer struct {
@@ -31,7 +32,7 @@ func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
3132
}
3233

3334
// Index indexes modified contacts and returns the name of the concrete index
34-
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
35+
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) {
3536
var err error
3637

3738
// find our physical index
@@ -47,6 +48,12 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
4748

4849
// doesn't exist or we are rebuilding, create it
4950
if physicalIndex == "" || rebuild {
51+
err = json.Unmarshal(contactsSettingsFile, &contactsSettings)
52+
if err != nil {
53+
return "", errors.Wrap(err, "error unmarshalling embeded contacts.settings.json file")
54+
}
55+
contactsSettings.Settings.Index.NumberOfShards = shards
56+
contactsSettings.Settings.Index.NumberOfReplicas = replicas
5057
physicalIndex, err = i.createNewIndex(contactsSettings)
5158
if err != nil {
5259
return "", errors.Wrap(err, "error creating new index")

indexers/contacts_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func TestContacts(t *testing.T) {
191191

192192
expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))
193193

194-
indexName, err := ix1.Index(db, false, false)
194+
indexName, err := ix1.Index(db, false, false, 2, 1)
195195
assert.NoError(t, err)
196196
assert.Equal(t, expectedIndexName, indexName)
197197

@@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
217217
require.NoError(t, err)
218218

219219
// and index again...
220-
indexName, err = ix1.Index(db, false, false)
220+
indexName, err = ix1.Index(db, false, false, 2, 1)
221221
assert.NoError(t, err)
222222
assert.Equal(t, expectedIndexName, indexName) // same index used
223223
assertIndexerStats(t, ix1, 10, 1)
@@ -240,7 +240,7 @@ func TestContacts(t *testing.T) {
240240
// and simulate another indexer doing a parallel rebuild
241241
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
242242

243-
indexName2, err := ix2.Index(db, true, false)
243+
indexName2, err := ix2.Index(db, true, false, 2, 1)
244244
assert.NoError(t, err)
245245
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
246246
assertIndexerStats(t, ix2, 8, 0)
@@ -255,7 +255,7 @@ func TestContacts(t *testing.T) {
255255

256256
// simulate another indexer doing a parallel rebuild with cleanup
257257
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
258-
indexName3, err := ix3.Index(db, true, true)
258+
indexName3, err := ix3.Index(db, true, true, 2, 1)
259259
assert.NoError(t, err)
260260
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
261261
assertIndexerStats(t, ix3, 8, 0)
@@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
264264
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})
265265

266266
// check that the original indexer now indexes against the new index
267-
indexName, err = ix1.Index(db, false, false)
267+
indexName, err = ix1.Index(db, false, false, 2, 1)
268268
assert.NoError(t, err)
269269
assert.Equal(t, expectedIndexName+"_2", indexName)
270270
}

0 commit comments

Comments
 (0)