Skip to content

Commit

Permalink
Add retry to bulk indexer (#39)
Browse files Browse the repository at this point in the history
Related to #37

We currently have no retry logic for bulk indexing. This is a problem
when running the Crawler on Mac because of an intermittent
`bad_record_mac` error. Retrying often resolves the error (so this is a
bandaid, not a fix).
Retries are healthy in general so this was a good opportunity to add
them.

- Add retry with max of 3 for bulk indexer
- Add timeout
- Add stats for docs that failed indexing
- Improve logging in general
  • Loading branch information
navarone-feekery authored Jun 4, 2024
1 parent 741f353 commit cc7ae78
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 39 deletions.
2 changes: 1 addition & 1 deletion lib/crawler/output_sink/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module OutputSink
class Base
attr_reader :config, :rule_engine

delegate :document_mapper, :events, :system_logger, to: :config
delegate :crawl_id, :document_mapper, :events, :system_logger, to: :config

def initialize(config)
@config = config
Expand Down
63 changes: 38 additions & 25 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,40 @@ def write(crawl_result)

def close
flush
system_logger.info(ingestion_stats)
msg = <<~LOG.squish
All indexing operations completed.
Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes.
Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes.
LOG
system_logger.info(msg)
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
data = operation_queue.pop_all
if data.empty?
payload = operation_queue.pop_all
if payload.empty?
system_logger.debug('Queue was empty when attempting to flush.')
return
end

system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...")
# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = payload.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")

begin
client.bulk(body: data, pipeline:) # TODO: parse response
client.bulk(body: payload, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue Utility::EsClient::IndexingFailedError => e
system_logger.warn("Bulk index failed: #{e}")
reset_ingestion_stats(false)
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
raise e
reset_ingestion_stats(false)
end

system_logger.debug("Bulk request containing #{data.size} items sent!")
reset_ingestion_stats

nil
end

def ingestion_stats
@completed.dup
{ completed: @completed.dup, failed: @failed.dup }
end

def operation_queue
Expand All @@ -99,7 +104,7 @@ def es_config
end

def client
@client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version)
@client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version, crawl_id)
end

def index_name
Expand Down Expand Up @@ -133,27 +138,35 @@ def parametrized_doc(crawl_result)

def init_ingestion_stats
@queued = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@completed = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@failed = {
docs_count: 0,
docs_volume: 0
}
end

def increment_ingestion_stats(doc)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += operation_queue.bytesize(doc)
@queued[:docs_count] += 1
@queued[:docs_volume] += operation_queue.bytesize(doc)
end

def reset_ingestion_stats
# TODO: this count isn't accurate, need to look into it
@completed[:indexed_document_count] += @queued[:indexed_document_count]
@completed[:indexed_document_volume] += @queued[:indexed_document_volume]
def reset_ingestion_stats(success)
if success
@completed[:docs_count] += @queued[:docs_count]
@completed[:docs_volume] += @queued[:docs_volume]
else
@failed[:docs_count] += @queued[:docs_count]
@failed[:docs_volume] += @queued[:docs_volume]
end

@queued[:indexed_document_count] = 0
@queued[:indexed_document_volume] = 0
@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end
end
end
Expand Down
48 changes: 44 additions & 4 deletions lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

# frozen_string_literal: true

require 'fileutils'
require 'elasticsearch'

module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds
FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to

class IndexingFailedError < StandardError
def initialize(message, error = nil)
Expand All @@ -21,8 +25,9 @@ def initialize(message, error = nil)
attr_reader :cause
end

def initialize(es_config, system_logger, crawler_version, &)
def initialize(es_config, system_logger, crawler_version, crawl_id, &)
@system_logger = system_logger
@crawl_id = crawl_id
super(connection_config(es_config, crawler_version), &)
end

Expand All @@ -32,6 +37,9 @@ def connection_config(es_config, crawler_version)
headers: {
'user-agent': "#{USER_AGENT}#{crawler_version}",
'X-elastic-product-origin': 'crawler'
},
request: {
timeout: REQUEST_TIMEOUT
}
}
}
Expand All @@ -42,8 +50,26 @@ def connection_config(es_config, crawler_version)
config
end

def bulk(arguments = {})
raise_if_necessary(super(arguments))
def bulk(payload = {})
retries = 0
begin
raise_if_necessary(super(payload))
rescue StandardError => e
retries += 1
if retries <= MAX_RETRIES
wait_time = 2**retries
@system_logger.info(<<~LOG.squish)
Bulk index attempt #{retries} failed: '#{e.message}'. Retrying in #{wait_time} seconds...
LOG
sleep(wait_time.seconds) && retry
else
@system_logger.warn(<<~LOG.squish)
Bulk index failed after #{retries} attempts: '#{e.message}'. Writing payload to file...
LOG
store_failed_payload(payload)
raise e
end
end
end

private
Expand Down Expand Up @@ -102,7 +128,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
end

@system_logger.debug("Errors found in bulk response. Full response: #{response}")
@system_logger.warn("Errors found in bulk response. Full response: #{response}")
if first_error
# TODO: add trace logging
# TODO: consider logging all errors instead of just first
Expand All @@ -117,5 +143,19 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
response
end

def store_failed_payload(payload)
dir = "#{FAILED_BULKS_DIR}/#{@crawl_id}"
FileUtils.mkdir_p(dir) unless File.directory?(dir)

filename = Time.now.strftime('%Y%m%d%H%M%S')
full_path = File.join(dir, filename)
File.open(full_path, 'w') do |file|
payload[:body].each do |item|
file.puts(item)
end
end
@system_logger.warn("Saved failed bulk payload to #{full_path}")
end
end
end
49 changes: 41 additions & 8 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:warn)

allow(Elasticsearch::API).to receive(:serializer).and_return(serializer)
allow(serializer).to receive(:dump).and_return('')
Expand Down Expand Up @@ -295,8 +296,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -311,8 +312,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -330,16 +331,48 @@
subject.flush
end

it 'returns expected indexed_document_count' do
it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(document_count)
expect(stats[:failed][:docs_count]).to eq(0)
end

it 'returns expected indexed_document_volume' do
it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

context 'when some documents failed to be ingested' do
let(:document_count) { 5 }
let(:serialized_object) { 'doesnt matter' }

before(:each) do
allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize)
allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError)

document_count.times.each do |x|
subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}"))
end

subject.flush
end

it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(0)
end

it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(0)
end
end
end
Expand Down
71 changes: 70 additions & 1 deletion spec/lib/utility/es_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let(:system_logger) { double }
let(:host) { 'http://notreallyaserver' }
let(:port) { '9200' }
let(:elastic_product_headers) { { 'x-elastic-product': 'Elasticsearch' } }
let(:config) do
{
elasticsearch: {
Expand All @@ -24,7 +25,7 @@
}.deep_symbolize_keys
end

let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test') }
let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test', 'crawl-id') }

before(:each) do
stub_request(:get, "#{host}:#{port}/")
Expand All @@ -34,6 +35,7 @@
# TODO: make a factory or something for system_logger mocks
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:warn)
end

describe '#connection_config' do
Expand Down Expand Up @@ -99,4 +101,71 @@
end
end
end

describe '#bulk' do
let(:payload) do
{
body: [
{ index: { _index: 'my_index', _id: '123' } },
{ id: '123', title: 'Foo', body_content: 'bar' }
]
}
end

context 'when successful' do
before :each do
stub_request(:post, "#{host}:#{port}/_bulk").to_return(status: 200, headers: elastic_product_headers)
end

it 'sends bulk request without error' do
result = subject.bulk(payload)
expect(result.status).to eq(200)
end
end

context 'when there is an error in the first attempt' do
before :each do
stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Intermittent failure' },
{ status: 200, headers: elastic_product_headers })
end

it 'succeeds on the retry' do
result = subject.bulk(payload)
expect(result.status).to eq(200)
expect(system_logger).to have_received(:info).with(
"Bulk index attempt 1 failed: 'Intermittent failure'. Retrying in 2 seconds..."
)
end
end

context 'when there is an error in every attempt' do
let(:fixed_time) { Time.new(2024, 1, 1, 0, 0, 0) }
let(:file_double) { double('File', puts: nil, close: nil) }

before :each do
stub_const('Utility::EsClient::MAX_RETRIES', 1)
allow(File).to receive(:open).and_yield(file_double)
allow(Time).to receive(:now).and_return(fixed_time)
stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Consistent failure' })
end

it 'raises an error after exhausting retries' do
expect { subject.bulk(payload) }.to raise_error(StandardError)

expect(system_logger).to have_received(:info).with(
"Bulk index attempt 1 failed: 'Consistent failure'. Retrying in 2 seconds..."
)
expect(system_logger).to have_received(:warn).with(
"Bulk index failed after 2 attempts: 'Consistent failure'. Writing payload to file..."
)

expect(File).to have_received(:open).with(
"#{Utility::EsClient::FAILED_BULKS_DIR}/crawl-id/#{fixed_time.strftime('%Y%m%d%H%M%S')}", 'w'
)

expect(file_double).to have_received(:puts).with(payload[:body].first)
expect(file_double).to have_received(:puts).with(payload[:body].second)
end
end
end
end

0 comments on commit cc7ae78

Please sign in to comment.