Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry to bulk indexer #39

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def write(crawl_result)

def close
flush
system_logger.info(ingestion_stats)
msg = <<~LOG.squish
All indexing operations completed.
Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes.
Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes.
LOG
system_logger.info(msg)
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
Expand All @@ -65,25 +70,25 @@ def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
return
end

system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...")
# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = data.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")

begin
client.bulk(body: data, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats_success
rescue Utility::EsClient::IndexingFailedError => e
system_logger.warn("Bulk index failed: #{e}")
reset_ingestion_stats_failure
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
raise e
reset_ingestion_stats_failure
end

system_logger.debug("Bulk request containing #{data.size} items sent!")
reset_ingestion_stats

nil
end

def ingestion_stats
@completed.dup
{ completed: @completed.dup, failed: @failed.dup }
end

def operation_queue
Expand Down Expand Up @@ -133,27 +138,38 @@ def parametrized_doc(crawl_result)

def init_ingestion_stats
@queued = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@completed = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@failed = {
docs_count: 0,
docs_volume: 0
}
end

def increment_ingestion_stats(doc)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += operation_queue.bytesize(doc)
@queued[:docs_count] += 1
@queued[:docs_volume] += operation_queue.bytesize(doc)
end

def reset_ingestion_stats_success
@completed[:docs_count] += @queued[:docs_count]
@completed[:docs_volume] += @queued[:docs_volume]

@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end

def reset_ingestion_stats
# TODO: this count isn't accurate, need to look into it
@completed[:indexed_document_count] += @queued[:indexed_document_count]
@completed[:indexed_document_volume] += @queued[:indexed_document_volume]
def reset_ingestion_stats_failure
@failed[:docs_count] += @queued[:docs_count]
@failed[:docs_volume] += @queued[:docs_volume]

@queued[:indexed_document_count] = 0
@queued[:indexed_document_volume] = 0
@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end
end
end
Expand Down
21 changes: 19 additions & 2 deletions lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds

class IndexingFailedError < StandardError
def initialize(message, error = nil)
Expand All @@ -32,6 +34,9 @@ def connection_config(es_config, crawler_version)
headers: {
'user-agent': "#{USER_AGENT}#{crawler_version}",
'X-elastic-product-origin': 'crawler'
},
request: {
timeout: REQUEST_TIMEOUT
}
}
}
Expand All @@ -43,7 +48,19 @@ def connection_config(es_config, crawler_version)
end

def bulk(arguments = {})
raise_if_necessary(super(arguments))
retries = 0
begin
raise_if_necessary(super(arguments))
rescue StandardError => e
retries += 1
if retries <= MAX_RETRIES
@system_logger.info("Bulk index attempt #{retries} failed: #{e.message}. Retrying...")
sleep(1.second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO fallback should be added here too, exponential is a good candidate for 3 retries

Copy link
Collaborator Author

@navarone-feekery navarone-feekery May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artem-shelkovnikov I've added a temporary fallback; if a bulk index fails, it saves the failed payload to a file and outputs that file name to the log. Users can cross reference the file content with whatever error response they received.
In the future I'd like to implement something else, but for now I think this should suffice.

I also added exponentially increasing wait times for the retries.

Now, in testing this, I realised the bulk queue can get overloaded if it encounters a lot of errors and exponentially backs off for many seconds. The crawler continues to try sending crawl results and the bulk indexer trips over itself and chaos.
I have an idea to fix this but I think it's out of scope for this PR, so I'll do it in a follow-up PR.

retry
else
@system_logger.warn("Bulk index failed after #{retries} attempts: #{e.message}.")
end
end
end

private
Expand Down Expand Up @@ -102,7 +119,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
end

@system_logger.debug("Errors found in bulk response. Full response: #{response}")
@system_logger.warn("Errors found in bulk response. Full response: #{response}")
if first_error
# TODO: add trace logging
# TODO: consider logging all errors instead of just first
Expand Down
49 changes: 41 additions & 8 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:warn)

allow(Elasticsearch::API).to receive(:serializer).and_return(serializer)
allow(serializer).to receive(:dump).and_return('')
Expand Down Expand Up @@ -295,8 +296,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -311,8 +312,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -330,16 +331,48 @@
subject.flush
end

it 'returns expected indexed_document_count' do
it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(document_count)
expect(stats[:failed][:docs_count]).to eq(0)
end

it 'returns expected indexed_document_volume' do
it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

context 'when some documents failed to be ingested' do
let(:document_count) { 5 }
let(:serialized_object) { 'doesnt matter' }

before(:each) do
allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize)
allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError)

document_count.times.each do |x|
subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}"))
end

subject.flush
end

it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(0)
end

it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(0)
end
end
end
Expand Down