Skip to content

Commit

Permalink
[0.2] Adding ES verification step + explicit best-effort index creati…
Browse files Browse the repository at this point in the history
…on during ES Sink initialization (#192) (#207)

Backports the following commits to 0.2:
- Adding ES verification step + explicit best-effort index creation
during ES Sink initialization (#192)

Co-authored-by: Matt Nowzari <matt.nowzari@elastic.co>
  • Loading branch information
github-actions[bot] and mattnowzari authored Feb 6, 2025
1 parent eb84ad7 commit ff8b56d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 10 deletions.
29 changes: 24 additions & 5 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ def initialize(config)
# initialize client now to fail fast if config is bad
client

# ping ES by attempting to reach the index specified in config
ping_output_index
# ping ES to verify the provided config is good
verify_es_connection
# ping the output_index provided in the config, and create it if it does not exist
verify_output_index

@queue_lock = Mutex.new
init_ingestion_stats
Expand All @@ -43,9 +45,26 @@ def initialize(config)
)
end

def ping_output_index
raise Errors::IndexDoesNotExistError, system_logger.info("Failed to find index #{config.output_index}") unless
client.indices.exists(index: config.output_index)
def verify_es_connection
client.info
rescue Elastic::Transport::Transport::Error # rescue bc client.info crashes ungracefully when ES is unreachable
system_logger.info("Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}")
raise Errors::ExitIfESConnectionError
end

def verify_output_index
if client.indices.exists(index: config.output_index) == false
attempt_index_creation_or_exit
system_logger.info("Index [#{config.output_index}] did not exist, but was successfully created!")
else
system_logger.info("Index [#{config.output_index}] was found!")
end
end

def attempt_index_creation_or_exit
# helper method for verify_output_index
raise Errors::ExitIfUnableToCreateIndex, system_logger.info("Failed to create #{config.output_index}") unless
client.indices.create(index: config.output_index)
end

def write(crawl_result)
Expand Down
7 changes: 6 additions & 1 deletion lib/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ class BulkQueueOverflowError < StandardError; end
# exponential backoff. This error should be treated as retryable.
class SinkLockedError < StandardError; end

# Raised when there is a connection error to Elasticsearch. Specific for Elasticsearch sink.
# During initialization of the Elasticsearch sink, it will attempt to make contact to
# the host provided in the configuration. If contact cannot be established, a system exit will occur.
class ExitIfESConnectionError < SystemExit; end

# Raised when the desired output index does not exist. This is specific for Elasticsearch
# sink. During initialization of the Elasticsearch sink, it will call indices.exists()
# against the output_index value, and will continue if the index is found.
# If it is not found, this error will be raised, which causes a system exit to occur.
class IndexDoesNotExistError < SystemExit; end
class ExitIfUnableToCreateIndex < SystemExit; end
end
1 change: 1 addition & 0 deletions spec/lib/crawler/api/crawl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:info).and_return(true)
end

#-------------------------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions spec/lib/crawler/coordinator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
before do
allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:bulk)
allow(es_client).to receive(:info).and_return(true)
allow(es_client).to receive(:delete_by_query).and_return({ deleted: 1 }.stringify_keys)
allow(es_client)
.to receive(:paginated_search).and_return([search_result])
Expand Down
43 changes: 39 additions & 4 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
allow(config).to receive(:system_logger).and_return(system_logger)

allow(es_client).to receive(:bulk)
allow(es_client).to receive(:info).and_return(true)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:paginated_search)

Expand Down Expand Up @@ -89,15 +90,49 @@
end
end

context 'when connection to Elasticsearch cannot be established' do
before(:each) do
allow(es_client).to receive(:info).and_raise(Elastic::Transport::Transport::Error)
end

it 'should raise an ESConnectionError' do
expect { subject.verify_es_connection }.to raise_error(Errors::ExitIfESConnectionError)
expect(system_logger).to have_received(:info).with(
"Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}"
)
end
end

context 'when connection to Elasticsearch has been verified' do
it 'should not raise an ESConnectionError' do
expect { subject.verify_es_connection }.not_to raise_error
end
end

context 'when output index is provided but index does not exist in ES' do
before(:each) do
allow(es_client_indices).to receive(:exists).and_return(false)
allow(es_client_indices).to receive(:create).and_return({ 'some' => 'response' })
allow(subject).to receive(:verify_output_index)
end

it 'should create the index' do
expect(system_logger).to have_received(:info).with(
"Index [#{index_name}] did not exist, but was successfully created!"
)
end
end

context 'when output index is provided and index does not exist, but creation fails' do
before(:each) do
allow(es_client_indices).to receive(:exists).and_return(false)
allow(es_client_indices).to receive(:create).and_return(false)
end

it 'raises an IndexDoesNotExistError' do
expect { subject.ping_output_index }.to raise_error(Errors::IndexDoesNotExistError)
it 'raises ExitIfUnableToCreateIndex' do
expect { subject.attempt_index_creation_or_exit }.to raise_error(Errors::ExitIfUnableToCreateIndex)
expect(system_logger).to have_received(:info).with(
"Failed to find index #{index_name}"
"Failed to create #{index_name}"
)
end
end
Expand All @@ -108,7 +143,7 @@
end

it 'does not raise an error' do
expect { subject.ping_output_index }.not_to raise_error
expect { subject.verify_output_index }.not_to raise_error
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/lib/crawler/output_sink_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
before(:each) do
allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:info).and_return(true)
end

context '.create' do
Expand Down

0 comments on commit ff8b56d

Please sign in to comment.