Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.2] Adding ES verification step + explicit best-effort index creation during ES Sink initialization (#192) #207

Merged
merged 2 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ def initialize(config)
# initialize client now to fail fast if config is bad
client

# ping ES by attempting to reach the index specified in config
ping_output_index
# ping ES to verify the provided config is good
verify_es_connection
# ping the output_index provided in the config, and create it if it does not exist
verify_output_index

@queue_lock = Mutex.new
init_ingestion_stats
Expand All @@ -43,9 +45,26 @@ def initialize(config)
)
end

def ping_output_index
raise Errors::IndexDoesNotExistError, system_logger.info("Failed to find index #{config.output_index}") unless
client.indices.exists(index: config.output_index)
def verify_es_connection
client.info
rescue Elastic::Transport::Transport::Error # rescue bc client.info crashes ungracefully when ES is unreachable
system_logger.info("Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}")
raise Errors::ExitIfESConnectionError
end

def verify_output_index
if client.indices.exists(index: config.output_index) == false
attempt_index_creation_or_exit
system_logger.info("Index [#{config.output_index}] did not exist, but was successfully created!")
else
system_logger.info("Index [#{config.output_index}] was found!")
end
end

def attempt_index_creation_or_exit
# helper method for verify_output_index
raise Errors::ExitIfUnableToCreateIndex, system_logger.info("Failed to create #{config.output_index}") unless
client.indices.create(index: config.output_index)
end

def write(crawl_result)
Expand Down
7 changes: 6 additions & 1 deletion lib/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ class BulkQueueOverflowError < StandardError; end
# exponential backoff. This error should be treated as retryable.
class SinkLockedError < StandardError; end

# Raised when there is a connection error to Elasticsearch. Specific for Elasticsearch sink.
# During initialization of the Elasticsearch sink, it will attempt to make contact to
# the host provided in the configuration. If contact cannot be established, a system exit will occur.
class ExitIfESConnectionError < SystemExit; end

# Raised when the desired output index does not exist. This is specific for Elasticsearch
# sink. During initialization of the Elasticsearch sink, it will call indices.exists()
# against the output_index value, and will continue if the index is found.
# If it is not found, this error will be raised, which causes a system exit to occur.
class IndexDoesNotExistError < SystemExit; end
class ExitIfUnableToCreateIndex < SystemExit; end
end
1 change: 1 addition & 0 deletions spec/lib/crawler/api/crawl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:info).and_return(true)
end

#-------------------------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions spec/lib/crawler/coordinator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
before do
allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:bulk)
allow(es_client).to receive(:info).and_return(true)
allow(es_client).to receive(:delete_by_query).and_return({ deleted: 1 }.stringify_keys)
allow(es_client)
.to receive(:paginated_search).and_return([search_result])
Expand Down
43 changes: 39 additions & 4 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
allow(config).to receive(:system_logger).and_return(system_logger)

allow(es_client).to receive(:bulk)
allow(es_client).to receive(:info).and_return(true)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:paginated_search)

Expand Down Expand Up @@ -89,15 +90,49 @@
end
end

context 'when connection to Elasticsearch cannot be established' do
before(:each) do
allow(es_client).to receive(:info).and_raise(Elastic::Transport::Transport::Error)
end

it 'should raise an ESConnectionError' do
expect { subject.verify_es_connection }.to raise_error(Errors::ExitIfESConnectionError)
expect(system_logger).to have_received(:info).with(
"Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}"
)
end
end

context 'when connection to Elasticsearch has been verified' do
it 'should not raise an ESConnectionError' do
expect { subject.verify_es_connection }.not_to raise_error
end
end

context 'when output index is provided but index does not exist in ES' do
before(:each) do
allow(es_client_indices).to receive(:exists).and_return(false)
allow(es_client_indices).to receive(:create).and_return({ 'some' => 'response' })
allow(subject).to receive(:verify_output_index)
end

it 'should create the index' do
expect(system_logger).to have_received(:info).with(
"Index [#{index_name}] did not exist, but was successfully created!"
)
end
end

context 'when output index is provided and index does not exist, but creation fails' do
before(:each) do
allow(es_client_indices).to receive(:exists).and_return(false)
allow(es_client_indices).to receive(:create).and_return(false)
end

it 'raises an IndexDoesNotExistError' do
expect { subject.ping_output_index }.to raise_error(Errors::IndexDoesNotExistError)
it 'raises ExitIfUnableToCreateIndex' do
expect { subject.attempt_index_creation_or_exit }.to raise_error(Errors::ExitIfUnableToCreateIndex)
expect(system_logger).to have_received(:info).with(
"Failed to find index #{index_name}"
"Failed to create #{index_name}"
)
end
end
Expand All @@ -108,7 +143,7 @@
end

it 'does not raise an error' do
expect { subject.ping_output_index }.not_to raise_error
expect { subject.verify_output_index }.not_to raise_error
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/lib/crawler/output_sink_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
before(:each) do
allow(ES::Client).to receive(:new).and_return(es_client)
allow(es_client).to receive(:indices).and_return(es_client_indices)
allow(es_client).to receive(:info).and_return(true)
end

context '.create' do
Expand Down