Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry to bulk indexer #39

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/crawler/output_sink/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module OutputSink
class Base
attr_reader :config, :rule_engine

delegate :document_mapper, :events, :system_logger, to: :config
delegate :crawl_id, :document_mapper, :events, :system_logger, to: :config

def initialize(config)
@config = config
Expand Down
63 changes: 38 additions & 25 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,40 @@ def write(crawl_result)

def close
flush
system_logger.info(ingestion_stats)
msg = <<~LOG.squish
All indexing operations completed.
Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes.
Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes.
LOG
system_logger.info(msg)
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
data = operation_queue.pop_all
if data.empty?
payload = operation_queue.pop_all
if payload.empty?
system_logger.debug('Queue was empty when attempting to flush.')
return
end

system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...")
# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = payload.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")

begin
client.bulk(body: data, pipeline:) # TODO: parse response
client.bulk(body: payload, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue Utility::EsClient::IndexingFailedError => e
system_logger.warn("Bulk index failed: #{e}")
reset_ingestion_stats(false)
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
raise e
reset_ingestion_stats(false)
end

system_logger.debug("Bulk request containing #{data.size} items sent!")
reset_ingestion_stats

nil
end

def ingestion_stats
@completed.dup
{ completed: @completed.dup, failed: @failed.dup }
end

def operation_queue
Expand All @@ -99,7 +104,7 @@ def es_config
end

def client
@client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version)
@client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version, crawl_id)
end

def index_name
Expand Down Expand Up @@ -133,27 +138,35 @@ def parametrized_doc(crawl_result)

def init_ingestion_stats
@queued = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@completed = {
indexed_document_count: 0,
indexed_document_volume: 0
docs_count: 0,
docs_volume: 0
}
@failed = {
docs_count: 0,
docs_volume: 0
}
end

def increment_ingestion_stats(doc)
@queued[:indexed_document_count] += 1
@queued[:indexed_document_volume] += operation_queue.bytesize(doc)
@queued[:docs_count] += 1
@queued[:docs_volume] += operation_queue.bytesize(doc)
end

def reset_ingestion_stats
# TODO: this count isn't accurate, need to look into it
@completed[:indexed_document_count] += @queued[:indexed_document_count]
@completed[:indexed_document_volume] += @queued[:indexed_document_volume]
def reset_ingestion_stats(success)
if success
@completed[:docs_count] += @queued[:docs_count]
@completed[:docs_volume] += @queued[:docs_volume]
else
@failed[:docs_count] += @queued[:docs_count]
@failed[:docs_volume] += @queued[:docs_volume]
end

@queued[:indexed_document_count] = 0
@queued[:indexed_document_volume] = 0
@queued[:docs_count] = 0
@queued[:docs_volume] = 0
end
end
end
Expand Down
48 changes: 44 additions & 4 deletions lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

# frozen_string_literal: true

require 'fileutils'
require 'elasticsearch'

module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds
FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to

class IndexingFailedError < StandardError
def initialize(message, error = nil)
Expand All @@ -21,8 +25,9 @@ def initialize(message, error = nil)
attr_reader :cause
end

def initialize(es_config, system_logger, crawler_version, &)
def initialize(es_config, system_logger, crawler_version, crawl_id, &)
@system_logger = system_logger
@crawl_id = crawl_id
super(connection_config(es_config, crawler_version), &)
end

Expand All @@ -32,6 +37,9 @@ def connection_config(es_config, crawler_version)
headers: {
'user-agent': "#{USER_AGENT}#{crawler_version}",
'X-elastic-product-origin': 'crawler'
},
request: {
timeout: REQUEST_TIMEOUT
}
}
}
Expand All @@ -42,8 +50,26 @@ def connection_config(es_config, crawler_version)
config
end

def bulk(arguments = {})
raise_if_necessary(super(arguments))
def bulk(payload = {})
retries = 0
begin
raise_if_necessary(super(payload))
rescue StandardError => e
retries += 1
if retries <= MAX_RETRIES
wait_time = 2**retries
@system_logger.info(<<~LOG.squish)
Bulk index attempt #{retries} failed: '#{e.message}'. Retrying in #{wait_time} seconds...
LOG
sleep(wait_time.seconds) && retry
else
@system_logger.warn(<<~LOG.squish)
Bulk index failed after #{retries} attempts: '#{e.message}'. Writing payload to file...
LOG
store_failed_payload(payload)
raise e
end
end
end

private
Expand Down Expand Up @@ -102,7 +128,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
end

@system_logger.debug("Errors found in bulk response. Full response: #{response}")
@system_logger.warn("Errors found in bulk response. Full response: #{response}")
if first_error
# TODO: add trace logging
# TODO: consider logging all errors instead of just first
Expand All @@ -117,5 +143,19 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics
end
response
end

def store_failed_payload(payload)
dir = "#{FAILED_BULKS_DIR}/#{@crawl_id}"
FileUtils.mkdir_p(dir) unless File.directory?(dir)

filename = Time.now.strftime('%Y%m%d%H%M%S')
full_path = File.join(dir, filename)
File.open(full_path, 'w') do |file|
payload[:body].each do |item|
file.puts(item)
end
end
@system_logger.warn("Saved failed bulk payload to #{full_path}")
end
end
end
49 changes: 41 additions & 8 deletions spec/lib/crawler/output_sink/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:warn)

allow(Elasticsearch::API).to receive(:serializer).and_return(serializer)
allow(serializer).to receive(:dump).and_return('')
Expand Down Expand Up @@ -295,8 +296,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -311,8 +312,8 @@
it 'returns empty stats' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
expect(stats[:completed][:docs_count]).to eq(0)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

Expand All @@ -330,16 +331,48 @@
subject.flush
end

it 'returns expected indexed_document_count' do
it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(document_count)
expect(stats[:failed][:docs_count]).to eq(0)
end

it 'returns expected indexed_document_volume' do
it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:failed][:docs_volume]).to eq(0)
end
end

context 'when some documents failed to be ingested' do
let(:document_count) { 5 }
let(:serialized_object) { 'doesnt matter' }

before(:each) do
allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize)
allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError)

document_count.times.each do |x|
subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}"))
end

subject.flush
end

it 'returns expected docs_count' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_count]).to eq(document_count)
expect(stats[:completed][:docs_count]).to eq(0)
end

it 'returns expected docs_volume' do
stats = subject.ingestion_stats

expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize)
expect(stats[:completed][:docs_volume]).to eq(0)
end
end
end
Expand Down
71 changes: 70 additions & 1 deletion spec/lib/utility/es_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let(:system_logger) { double }
let(:host) { 'http://notreallyaserver' }
let(:port) { '9200' }
let(:elastic_product_headers) { { 'x-elastic-product': 'Elasticsearch' } }
let(:config) do
{
elasticsearch: {
Expand All @@ -24,7 +25,7 @@
}.deep_symbolize_keys
end

let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test') }
let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test', 'crawl-id') }

before(:each) do
stub_request(:get, "#{host}:#{port}/")
Expand All @@ -34,6 +35,7 @@
# TODO: make a factory or something for system_logger mocks
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:warn)
end

describe '#connection_config' do
Expand Down Expand Up @@ -99,4 +101,71 @@
end
end
end

describe '#bulk' do
let(:payload) do
{
body: [
{ index: { _index: 'my_index', _id: '123' } },
{ id: '123', title: 'Foo', body_content: 'bar' }
]
}
end

context 'when successful' do
before :each do
stub_request(:post, "#{host}:#{port}/_bulk").to_return(status: 200, headers: elastic_product_headers)
end

it 'sends bulk request without error' do
result = subject.bulk(payload)
expect(result.status).to eq(200)
end
end

context 'when there is an error in the first attempt' do
before :each do
stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Intermittent failure' },
{ status: 200, headers: elastic_product_headers })
end

it 'succeeds on the retry' do
result = subject.bulk(payload)
expect(result.status).to eq(200)
expect(system_logger).to have_received(:info).with(
"Bulk index attempt 1 failed: 'Intermittent failure'. Retrying in 2 seconds..."
)
end
end

context 'when there is an error in every attempt' do
let(:fixed_time) { Time.new(2024, 1, 1, 0, 0, 0) }
let(:file_double) { double('File', puts: nil, close: nil) }

before :each do
stub_const('Utility::EsClient::MAX_RETRIES', 1)
allow(File).to receive(:open).and_yield(file_double)
allow(Time).to receive(:now).and_return(fixed_time)
stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Consistent failure' })
end

it 'raises an error after exhausting retries' do
expect { subject.bulk(payload) }.to raise_error(StandardError)

expect(system_logger).to have_received(:info).with(
"Bulk index attempt 1 failed: 'Consistent failure'. Retrying in 2 seconds..."
)
expect(system_logger).to have_received(:warn).with(
"Bulk index failed after 2 attempts: 'Consistent failure'. Writing payload to file..."
)

expect(File).to have_received(:open).with(
"#{Utility::EsClient::FAILED_BULKS_DIR}/crawl-id/#{fixed_time.strftime('%Y%m%d%H%M%S')}", 'w'
)

expect(file_double).to have_received(:puts).with(payload[:body].first)
expect(file_double).to have_received(:puts).with(payload[:body].second)
end
end
end
end