From cc7ae78cad4139e3867fbec3038e6722a402d084 Mon Sep 17 00:00:00 2001 From: Navarone Feekery <13634519+navarone-feekery@users.noreply.github.com> Date: Tue, 4 Jun 2024 10:03:51 +0200 Subject: [PATCH] Add retry to bulk indexer (#39) Related to https://github.com/elastic/crawler/issues/37 We currently have no retry logic for bulk indexing. This is a problem when running the Crawler on Mac because of an intermittent `bad_record_mac` error. Retrying often resolves the error (so this is a bandaid, not a fix). Retries are healthy in general so this was a good opportunity to add them. - Add retry with max of 3 for bulk indexer - Add timeout - Add stats for docs that failed indexing - Improve logging in general --- lib/crawler/output_sink/base.rb | 2 +- lib/crawler/output_sink/elasticsearch.rb | 63 +++++++++------- lib/utility/es_client.rb | 48 +++++++++++-- .../crawler/output_sink/elasticsearch_spec.rb | 49 ++++++++++--- spec/lib/utility/es_client_spec.rb | 71 ++++++++++++++++++- 5 files changed, 194 insertions(+), 39 deletions(-) diff --git a/lib/crawler/output_sink/base.rb b/lib/crawler/output_sink/base.rb index 7334009..95e1504 100644 --- a/lib/crawler/output_sink/base.rb +++ b/lib/crawler/output_sink/base.rb @@ -13,7 +13,7 @@ module OutputSink class Base attr_reader :config, :rule_engine - delegate :document_mapper, :events, :system_logger, to: :config + delegate :crawl_id, :document_mapper, :events, :system_logger, to: :config def initialize(config) @config = config diff --git a/lib/crawler/output_sink/elasticsearch.rb b/lib/crawler/output_sink/elasticsearch.rb index cbed476..2f01a6a 100644 --- a/lib/crawler/output_sink/elasticsearch.rb +++ b/lib/crawler/output_sink/elasticsearch.rb @@ -55,35 +55,40 @@ def write(crawl_result) def close flush - system_logger.info(ingestion_stats) + msg = <<~LOG.squish + All indexing operations completed. + Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes. + Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes. + LOG + system_logger.info(msg) end def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - data = operation_queue.pop_all - if data.empty? + payload = operation_queue.pop_all + if payload.empty? system_logger.debug('Queue was empty when attempting to flush.') return end - system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...") + # a single doc needs two items in a bulk request, so halving the count makes logs clearer + indexing_docs_count = payload.size / 2 + system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...") begin - client.bulk(body: data, pipeline:) # TODO: parse response + client.bulk(body: payload, pipeline:) # TODO: parse response + system_logger.info("Successfully indexed #{indexing_docs_count} docs.") + reset_ingestion_stats(true) rescue Utility::EsClient::IndexingFailedError => e system_logger.warn("Bulk index failed: #{e}") + reset_ingestion_stats(false) rescue StandardError => e system_logger.warn("Bulk index failed for unexpected reason: #{e}") - raise e + reset_ingestion_stats(false) end - - system_logger.debug("Bulk request containing #{data.size} items sent!") - reset_ingestion_stats - - nil end def ingestion_stats - @completed.dup + { completed: @completed.dup, failed: @failed.dup } end def operation_queue @@ -99,7 +104,7 @@ def es_config end def client - @client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version) + @client ||= Utility::EsClient.new(es_config, system_logger, Crawler.version, crawl_id) end def index_name @@ -133,27 +138,35 @@ def parametrized_doc(crawl_result) def init_ingestion_stats @queued = { - indexed_document_count: 0, - indexed_document_volume: 0 + docs_count: 0, + docs_volume: 0 } @completed = { - indexed_document_count: 0, - indexed_document_volume: 0 + docs_count: 0, + docs_volume: 0 + } + @failed = { + docs_count: 0, + docs_volume: 0 } end def increment_ingestion_stats(doc) - @queued[:indexed_document_count] += 1 - @queued[:indexed_document_volume] += operation_queue.bytesize(doc) + @queued[:docs_count] += 1 + @queued[:docs_volume] += operation_queue.bytesize(doc) end - def reset_ingestion_stats - # TODO: this count isn't accurate, need to look into it - @completed[:indexed_document_count] += @queued[:indexed_document_count] - @completed[:indexed_document_volume] += @queued[:indexed_document_volume] + def reset_ingestion_stats(success) + if success + @completed[:docs_count] += @queued[:docs_count] + @completed[:docs_volume] += @queued[:docs_volume] + else + @failed[:docs_count] += @queued[:docs_count] + @failed[:docs_volume] += @queued[:docs_volume] + end - @queued[:indexed_document_count] = 0 - @queued[:indexed_document_volume] = 0 + @queued[:docs_count] = 0 + @queued[:docs_volume] = 0 end end end diff --git a/lib/utility/es_client.rb b/lib/utility/es_client.rb index 540bb54..b96025a 100644 --- a/lib/utility/es_client.rb +++ b/lib/utility/es_client.rb @@ -6,11 +6,15 @@ # frozen_string_literal: true +require 'fileutils' require 'elasticsearch' module Utility class EsClient < ::Elasticsearch::Client USER_AGENT = 'elastic-web-crawler-' + MAX_RETRIES = 3 + REQUEST_TIMEOUT = 30 # seconds + FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to class IndexingFailedError < StandardError def initialize(message, error = nil) @@ -21,8 +25,9 @@ def initialize(message, error = nil) attr_reader :cause end - def initialize(es_config, system_logger, crawler_version, &) + def initialize(es_config, system_logger, crawler_version, crawl_id, &) @system_logger = system_logger + @crawl_id = crawl_id super(connection_config(es_config, crawler_version), &) end @@ -32,6 +37,9 @@ def connection_config(es_config, crawler_version) headers: { 'user-agent': "#{USER_AGENT}#{crawler_version}", 'X-elastic-product-origin': 'crawler' + }, + request: { + timeout: REQUEST_TIMEOUT } } } @@ -42,8 +50,26 @@ def connection_config(es_config, crawler_version) config end - def bulk(arguments = {}) - raise_if_necessary(super(arguments)) + def bulk(payload = {}) + retries = 0 + begin + raise_if_necessary(super(payload)) + rescue StandardError => e + retries += 1 + if retries <= MAX_RETRIES + wait_time = 2**retries + @system_logger.info(<<~LOG.squish) + Bulk index attempt #{retries} failed: '#{e.message}'. Retrying in #{wait_time} seconds... + LOG + sleep(wait_time.seconds) && retry + else + @system_logger.warn(<<~LOG.squish) + Bulk index failed after #{retries} attempts: '#{e.message}'. Writing payload to file... + LOG + store_failed_payload(payload) + raise e + end + end end private @@ -102,7 +128,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics end end - @system_logger.debug("Errors found in bulk response. Full response: #{response}") + @system_logger.warn("Errors found in bulk response. Full response: #{response}") if first_error # TODO: add trace logging # TODO: consider logging all errors instead of just first @@ -117,5 +143,19 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics end response end + + def store_failed_payload(payload) + dir = "#{FAILED_BULKS_DIR}/#{@crawl_id}" + FileUtils.mkdir_p(dir) unless File.directory?(dir) + + filename = Time.now.strftime('%Y%m%d%H%M%S') + full_path = File.join(dir, filename) + File.open(full_path, 'w') do |file| + payload[:body].each do |item| + file.puts(item) + end + end + @system_logger.warn("Saved failed bulk payload to #{full_path}") + end end end diff --git a/spec/lib/crawler/output_sink/elasticsearch_spec.rb b/spec/lib/crawler/output_sink/elasticsearch_spec.rb index 72486db..c6a0c65 100644 --- a/spec/lib/crawler/output_sink/elasticsearch_spec.rb +++ b/spec/lib/crawler/output_sink/elasticsearch_spec.rb @@ -52,6 +52,7 @@ allow(system_logger).to receive(:debug) allow(system_logger).to receive(:info) + allow(system_logger).to receive(:warn) allow(Elasticsearch::API).to receive(:serializer).and_return(serializer) allow(serializer).to receive(:dump).and_return('') @@ -295,8 +296,8 @@ it 'returns empty stats' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(0) - expect(stats[:indexed_document_volume]).to eq(0) + expect(stats[:completed][:docs_count]).to eq(0) + expect(stats[:failed][:docs_volume]).to eq(0) end end @@ -311,8 +312,8 @@ it 'returns empty stats' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(0) - expect(stats[:indexed_document_volume]).to eq(0) + expect(stats[:completed][:docs_count]).to eq(0) + expect(stats[:failed][:docs_volume]).to eq(0) end end @@ -330,16 +331,48 @@ subject.flush end - it 'returns expected indexed_document_count' do + it 'returns expected docs_count' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(document_count) + expect(stats[:completed][:docs_count]).to eq(document_count) + expect(stats[:failed][:docs_count]).to eq(0) end - it 'returns expected indexed_document_volume' do + it 'returns expected docs_volume' do stats = subject.ingestion_stats - expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:failed][:docs_volume]).to eq(0) + end + end + + context 'when some documents failed to be ingested' do + let(:document_count) { 5 } + let(:serialized_object) { 'doesnt matter' } + + before(:each) do + allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize) + allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError) + + document_count.times.each do |x| + subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}")) + end + + subject.flush + end + + it 'returns expected docs_count' do + stats = subject.ingestion_stats + + expect(stats[:failed][:docs_count]).to eq(document_count) + expect(stats[:completed][:docs_count]).to eq(0) + end + + it 'returns expected docs_volume' do + stats = subject.ingestion_stats + + expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:completed][:docs_volume]).to eq(0) end end end diff --git a/spec/lib/utility/es_client_spec.rb b/spec/lib/utility/es_client_spec.rb index 0c4958e..0859e5a 100644 --- a/spec/lib/utility/es_client_spec.rb +++ b/spec/lib/utility/es_client_spec.rb @@ -12,6 +12,7 @@ let(:system_logger) { double } let(:host) { 'http://notreallyaserver' } let(:port) { '9200' } + let(:elastic_product_headers) { { 'x-elastic-product': 'Elasticsearch' } } let(:config) do { elasticsearch: { @@ -24,7 +25,7 @@ }.deep_symbolize_keys end - let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test') } + let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test', 'crawl-id') } before(:each) do stub_request(:get, "#{host}:#{port}/") @@ -34,6 +35,7 @@ # TODO: make a factory or something for system_logger mocks allow(system_logger).to receive(:info) allow(system_logger).to receive(:debug) + allow(system_logger).to receive(:warn) end describe '#connection_config' do @@ -99,4 +101,71 @@ end end end + + describe '#bulk' do + let(:payload) do + { + body: [ + { index: { _index: 'my_index', _id: '123' } }, + { id: '123', title: 'Foo', body_content: 'bar' } + ] + } + end + + context 'when successful' do + before :each do + stub_request(:post, "#{host}:#{port}/_bulk").to_return(status: 200, headers: elastic_product_headers) + end + + it 'sends bulk request without error' do + result = subject.bulk(payload) + expect(result.status).to eq(200) + end + end + + context 'when there is an error in the first attempt' do + before :each do + stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Intermittent failure' }, + { status: 200, headers: elastic_product_headers }) + end + + it 'succeeds on the retry' do + result = subject.bulk(payload) + expect(result.status).to eq(200) + expect(system_logger).to have_received(:info).with( + "Bulk index attempt 1 failed: 'Intermittent failure'. Retrying in 2 seconds..." + ) + end + end + + context 'when there is an error in every attempt' do + let(:fixed_time) { Time.new(2024, 1, 1, 0, 0, 0) } + let(:file_double) { double('File', puts: nil, close: nil) } + + before :each do + stub_const('Utility::EsClient::MAX_RETRIES', 1) + allow(File).to receive(:open).and_yield(file_double) + allow(Time).to receive(:now).and_return(fixed_time) + stub_request(:post, "#{host}:#{port}/_bulk").to_return({ status: 404, exception: 'Consistent failure' }) + end + + it 'raises an error after exhausting retries' do + expect { subject.bulk(payload) }.to raise_error(StandardError) + + expect(system_logger).to have_received(:info).with( + "Bulk index attempt 1 failed: 'Consistent failure'. Retrying in 2 seconds..." + ) + expect(system_logger).to have_received(:warn).with( + "Bulk index failed after 2 attempts: 'Consistent failure'. Writing payload to file..." + ) + + expect(File).to have_received(:open).with( + "#{Utility::EsClient::FAILED_BULKS_DIR}/crawl-id/#{fixed_time.strftime('%Y%m%d%H%M%S')}", 'w' + ) + + expect(file_double).to have_received(:puts).with(payload[:body].first) + expect(file_double).to have_received(:puts).with(payload[:body].second) + end + end + end end