Skip to content

Commit

Permalink
Add retry to bulk indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
navarone-feekery committed May 30, 2024
1 parent 408b118 commit 2bf0954
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
58 changes: 37 additions & 21 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def write(crawl_result)

def close
flush
system_logger.info(ingestion_stats)
msg = <<~LOG.squish
All indexing operations completed.
Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes.
Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes.
LOG
system_logger.info(msg)
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
Expand All @@ -65,25 +70,25 @@ def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
return
end

system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...")
# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = data.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")

begin
client.bulk(body: data, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats_success
rescue Utility::EsClient::IndexingFailedError => e
system_logger.warn("Bulk index failed: #{e}")
reset_ingestion_stats_failure
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
raise e
reset_ingestion_stats_failure
end

system_logger.debug("Bulk request containing #{data.size} items sent!")
reset_ingestion_stats

nil
end

def ingestion_stats
@completed.dup
{ completed: @completed.dup, failed: @failed.dup }
end

def operation_queue
Expand Down Expand Up @@ -133,27 +138,38 @@ def parametrized_doc(crawl_result)

def init_ingestion_stats
@queued = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@completed = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@failed = {
docs_count: 0,
docs_volume: 0
}
end

def increment_ingestion_stats(doc)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += operation_queue.bytesize(doc)
@queued[:docs_count] += 1
@queued[:docs_volume] += operation_queue.bytesize(doc)
end

def reset_ingestion_stats_success
@completed[:docs_count] += @queued[:docs_count]
@completed[:docs_volume] += @queued[:docs_volume]

@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end

def reset_ingestion_stats
# TODO: this count isn't accurate, need to look into it
@completed[:indexed_document_count] += @queued[:indexed_document_count]
@completed[:indexed_document_volume] += @queued[:indexed_document_volume]
def reset_ingestion_stats_failure
@failed[:docs_count] += @queued[:docs_count]
@failed[:docs_volume] += @queued[:docs_volume]

@queued[:indexed_document_count] = 0
@queued[:indexed_document_volume] = 0
@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end
end
end
Expand Down
21 changes: 19 additions & 2 deletions lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds

class IndexingFailedError < StandardError
def initialize(message, error = nil)
Expand All @@ -32,6 +34,9 @@ def connection_config(es_config, crawler_version)
headers: {
'user-agent': "#{USER_AGENT}#{crawler_version}",
'X-elastic-product-origin': 'crawler'
},
request: {
timeout: REQUEST_TIMEOUT
}
}
}
Expand All @@ -43,7 +48,19 @@ def connection_config(es_config, crawler_version)
end

def bulk(arguments = {})
raise_if_necessary(super(arguments))
retries = 0
begin
raise_if_necessary(super(arguments))
rescue StandardError => e
retries += 1
if retries <= MAX_RETRIES
@system_logger.info("Bulk index attempt #{retries} failed: #{e.message}. Retrying...")
sleep(1.second)
retry
else
@system_logger.warn("Bulk index failed after #{retries} attempts: #{e.message}.")
end
end
end

private
Expand Down Expand Up @@ -102,7 +119,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
end

@system_logger.debug("Errors found in bulk response. Full response: #{response}")
@system_logger.warn("Errors found in bulk response. Full response: #{response}")
if first_error
# TODO: add trace logging
# TODO: consider logging all errors instead of just first
Expand Down
49 changes: 41 additions & 8 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:warn)

allow(Elasticsearch::API).to receive(:serializer).and_return(serializer)
allow(serializer).to receive(:dump).and_return('')
Expand Down Expand Up @@ -295,8 +296,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -311,8 +312,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -330,16 +331,48 @@
subject.flush
end

it 'returns expected indexed_document_count' do
it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(document_count)
expect(stats[:failed][:docs_count]).to eq(0)
end

it 'returns expected indexed_document_volume' do
it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

context 'when some documents failed to be ingested' do
let(:document_count) { 5 }
let(:serialized_object) { 'doesnt matter' }

before(:each) do
allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize)
allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError)

document_count.times.each do |x|
subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}"))
end

subject.flush
end

it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(0)
end

it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(0)
end
end
end
Expand Down

0 comments on commit 2bf0954

Please sign in to comment.