From ff8b56da4cf8df729dc468659981c5e2cd33f2e5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 6 Feb 2025 10:54:32 +0000 Subject: [PATCH] [0.2] Adding ES verification step + explicit best-effort index creation during ES Sink initialization (#192) (#207) Backports the following commits to 0.2: - Adding ES verification step + explicit best-effort index creation during ES Sink initialization (#192) Co-authored-by: Matt Nowzari --- lib/crawler/output_sink/elasticsearch.rb | 29 ++++++++++--- lib/errors.rb | 7 ++- spec/lib/crawler/api/crawl_spec.rb | 1 + spec/lib/crawler/coordinator_spec.rb | 1 + .../crawler/output_sink/elasticsearch_spec.rb | 43 +++++++++++++++++-- spec/lib/crawler/output_sink_spec.rb | 1 + 6 files changed, 72 insertions(+), 10 deletions(-) diff --git a/lib/crawler/output_sink/elasticsearch.rb b/lib/crawler/output_sink/elasticsearch.rb index ddc3477..434b44d 100644 --- a/lib/crawler/output_sink/elasticsearch.rb +++ b/lib/crawler/output_sink/elasticsearch.rb @@ -33,8 +33,10 @@ def initialize(config) # initialize client now to fail fast if config is bad client - # ping ES by attempting to reach the index specified in config - ping_output_index + # ping ES to verify the provided config is good + verify_es_connection + # ping the output_index provided in the config, and create it if it does not exist + verify_output_index @queue_lock = Mutex.new init_ingestion_stats @@ -43,9 +45,26 @@ def initialize(config) ) end - def ping_output_index - raise Errors::IndexDoesNotExistError, system_logger.info("Failed to find index #{config.output_index}") unless - client.indices.exists(index: config.output_index) + def verify_es_connection + client.info + rescue Elastic::Transport::Transport::Error # rescue bc client.info crashes ungracefully when ES is unreachable + system_logger.info("Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}") + raise Errors::ExitIfESConnectionError + end + + def verify_output_index + if client.indices.exists(index: config.output_index) == false + attempt_index_creation_or_exit + system_logger.info("Index [#{config.output_index}] did not exist, but was successfully created!") + else + system_logger.info("Index [#{config.output_index}] was found!") + end + end + + def attempt_index_creation_or_exit + # helper method for verify_output_index + raise Errors::ExitIfUnableToCreateIndex, system_logger.info("Failed to create #{config.output_index}") unless + client.indices.create(index: config.output_index) end def write(crawl_result) diff --git a/lib/errors.rb b/lib/errors.rb index 450b32b..c1020b4 100644 --- a/lib/errors.rb +++ b/lib/errors.rb @@ -19,9 +19,14 @@ class BulkQueueOverflowError < StandardError; end # exponential backoff. This error should be treated as retryable. class SinkLockedError < StandardError; end + # Raised when there is a connection error to Elasticsearch. Specific for Elasticsearch sink. + # During initialization of the Elasticsearch sink, it will attempt to make contact to + # the host provided in the configuration. If contact cannot be established, a system exit will occur. + class ExitIfESConnectionError < SystemExit; end + # Raised when the desired output index does not exist. This is specific for Elasticsearch # sink. During initialization of the Elasticsearch sink, it will call indices.exists() # against the output_index value, and will continue if the index is found. # If it is not found, this error will be raised, which causes a system exit to occur. - class IndexDoesNotExistError < SystemExit; end + class ExitIfUnableToCreateIndex < SystemExit; end end diff --git a/spec/lib/crawler/api/crawl_spec.rb b/spec/lib/crawler/api/crawl_spec.rb index 1155fff..25d9848 100644 --- a/spec/lib/crawler/api/crawl_spec.rb +++ b/spec/lib/crawler/api/crawl_spec.rb @@ -49,6 +49,7 @@ allow(ES::Client).to receive(:new).and_return(es_client) allow(es_client).to receive(:indices).and_return(es_client_indices) + allow(es_client).to receive(:info).and_return(true) end #------------------------------------------------------------------------------------------------- diff --git a/spec/lib/crawler/coordinator_spec.rb b/spec/lib/crawler/coordinator_spec.rb index 347de29..ac2d9d4 100644 --- a/spec/lib/crawler/coordinator_spec.rb +++ b/spec/lib/crawler/coordinator_spec.rb @@ -84,6 +84,7 @@ before do allow(ES::Client).to receive(:new).and_return(es_client) allow(es_client).to receive(:bulk) + allow(es_client).to receive(:info).and_return(true) allow(es_client).to receive(:delete_by_query).and_return({ deleted: 1 }.stringify_keys) allow(es_client) .to receive(:paginated_search).and_return([search_result]) diff --git a/spec/lib/crawler/output_sink/elasticsearch_spec.rb b/spec/lib/crawler/output_sink/elasticsearch_spec.rb index 14ee3be..5b113c9 100644 --- a/spec/lib/crawler/output_sink/elasticsearch_spec.rb +++ b/spec/lib/crawler/output_sink/elasticsearch_spec.rb @@ -43,6 +43,7 @@ allow(config).to receive(:system_logger).and_return(system_logger) allow(es_client).to receive(:bulk) + allow(es_client).to receive(:info).and_return(true) allow(es_client).to receive(:indices).and_return(es_client_indices) allow(es_client).to receive(:paginated_search) @@ -89,15 +90,49 @@ end end + context 'when connection to Elasticsearch cannot be established' do + before(:each) do + allow(es_client).to receive(:info).and_raise(Elastic::Transport::Transport::Error) + end + + it 'should raise an ESConnectionError' do + expect { subject.verify_es_connection }.to raise_error(Errors::ExitIfESConnectionError) + expect(system_logger).to have_received(:info).with( + "Failed to reach #{config.elasticsearch[:host]}:#{config.elasticsearch[:port]}" + ) + end + end + + context 'when connection to Elasticsearch has been verified' do + it 'should not raise an ESConnectionError' do + expect { subject.verify_es_connection }.not_to raise_error + end + end + context 'when output index is provided but index does not exist in ES' do before(:each) do allow(es_client_indices).to receive(:exists).and_return(false) + allow(es_client_indices).to receive(:create).and_return({ 'some' => 'response' }) + allow(subject).to receive(:verify_output_index) + end + + it 'should create the index' do + expect(system_logger).to have_received(:info).with( + "Index [#{index_name}] did not exist, but was successfully created!" + ) + end + end + + context 'when output index is provided and index does not exist, but creation fails' do + before(:each) do + allow(es_client_indices).to receive(:exists).and_return(false) + allow(es_client_indices).to receive(:create).and_return(false) end - it 'raises an IndexDoesNotExistError' do - expect { subject.ping_output_index }.to raise_error(Errors::IndexDoesNotExistError) + it 'raises ExitIfUnableToCreateIndex' do + expect { subject.attempt_index_creation_or_exit }.to raise_error(Errors::ExitIfUnableToCreateIndex) expect(system_logger).to have_received(:info).with( - "Failed to find index #{index_name}" + "Failed to create #{index_name}" ) end end @@ -108,7 +143,7 @@ end it 'does not raise an error' do - expect { subject.ping_output_index }.not_to raise_error + expect { subject.verify_output_index }.not_to raise_error end end diff --git a/spec/lib/crawler/output_sink_spec.rb b/spec/lib/crawler/output_sink_spec.rb index 9d61ffd..b197295 100644 --- a/spec/lib/crawler/output_sink_spec.rb +++ b/spec/lib/crawler/output_sink_spec.rb @@ -15,6 +15,7 @@ before(:each) do allow(ES::Client).to receive(:new).and_return(es_client) allow(es_client).to receive(:indices).and_return(es_client_indices) + allow(es_client).to receive(:info).and_return(true) end context '.create' do