From 2bf0954d8d32a09f8fa2d64ca6549c0a620b38a2 Mon Sep 17 00:00:00 2001 From: Navarone Feekery <13634519+navarone-feekery@users.noreply.github.com> Date: Thu, 30 May 2024 13:05:14 +0200 Subject: [PATCH] Add retry to bulk indexer --- lib/crawler/output_sink/elasticsearch.rb | 58 ++++++++++++------- lib/utility/es_client.rb | 21 ++++++- .../crawler/output_sink/elasticsearch_spec.rb | 49 +++++++++++++--- 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/lib/crawler/output_sink/elasticsearch.rb b/lib/crawler/output_sink/elasticsearch.rb index cbed476..0731bef 100644 --- a/lib/crawler/output_sink/elasticsearch.rb +++ b/lib/crawler/output_sink/elasticsearch.rb @@ -55,7 +55,12 @@ def write(crawl_result) def close flush - system_logger.info(ingestion_stats) + msg = <<~LOG.squish + All indexing operations completed. + Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes. + Failed to index #{@failed[:docs_count]} docs with a volume of #{@failed[:docs_volume]} bytes. + LOG + system_logger.info(msg) end def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength @@ -65,25 +70,25 @@ def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength return end - system_logger.debug("Sending bulk request with #{data.size} items and flushing queue...") + # a single doc needs two items in a bulk request, so halving the count makes logs clearer + indexing_docs_count = data.size / 2 + system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...") begin client.bulk(body: data, pipeline:) # TODO: parse response + system_logger.info("Successfully indexed #{indexing_docs_count} docs.") + reset_ingestion_stats_success rescue Utility::EsClient::IndexingFailedError => e system_logger.warn("Bulk index failed: #{e}") + reset_ingestion_stats_failure rescue StandardError => e system_logger.warn("Bulk index failed for unexpected reason: #{e}") - raise e + reset_ingestion_stats_failure end - - system_logger.debug("Bulk request containing #{data.size} items sent!") - reset_ingestion_stats - - nil end def ingestion_stats - @completed.dup + { completed: @completed.dup, failed: @failed.dup } end def operation_queue @@ -133,27 +138,38 @@ def parametrized_doc(crawl_result) def init_ingestion_stats @queued = { - indexed_document_count: 0, - indexed_document_volume: 0 + docs_count: 0, + docs_volume: 0 } @completed = { - indexed_document_count: 0, - indexed_document_volume: 0 + docs_count: 0, + docs_volume: 0 + } + @failed = { + docs_count: 0, + docs_volume: 0 } end def increment_ingestion_stats(doc) - @queued[:indexed_document_count] += 1 - @queued[:indexed_document_volume] += operation_queue.bytesize(doc) + @queued[:docs_count] += 1 + @queued[:docs_volume] += operation_queue.bytesize(doc) + end + + def reset_ingestion_stats_success + @completed[:docs_count] += @queued[:docs_count] + @completed[:docs_volume] += @queued[:docs_volume] + + @queued[:docs_count] = 0 + @queued[:docs_volume] = 0 end - def reset_ingestion_stats - # TODO: this count isn't accurate, need to look into it - @completed[:indexed_document_count] += @queued[:indexed_document_count] - @completed[:indexed_document_volume] += @queued[:indexed_document_volume] + def reset_ingestion_stats_failure + @failed[:docs_count] += @queued[:docs_count] + @failed[:docs_volume] += @queued[:docs_volume] - @queued[:indexed_document_count] = 0 - @queued[:indexed_document_volume] = 0 + @queued[:docs_count] = 0 + @queued[:docs_volume] = 0 end end end diff --git a/lib/utility/es_client.rb b/lib/utility/es_client.rb index 540bb54..2aec9fd 100644 --- a/lib/utility/es_client.rb +++ b/lib/utility/es_client.rb @@ -11,6 +11,8 @@ module Utility class EsClient < ::Elasticsearch::Client USER_AGENT = 'elastic-web-crawler-' + MAX_RETRIES = 3 + REQUEST_TIMEOUT = 30 # seconds class IndexingFailedError < StandardError def initialize(message, error = nil) @@ -32,6 +34,9 @@ def connection_config(es_config, crawler_version) headers: { 'user-agent': "#{USER_AGENT}#{crawler_version}", 'X-elastic-product-origin': 'crawler' + }, + request: { + timeout: REQUEST_TIMEOUT } } } @@ -43,7 +48,19 @@ def connection_config(es_config, crawler_version) end def bulk(arguments = {}) - raise_if_necessary(super(arguments)) + retries = 0 + begin + raise_if_necessary(super(arguments)) + rescue StandardError => e + retries += 1 + if retries <= MAX_RETRIES + @system_logger.info("Bulk index attempt #{retries} failed: #{e.message}. Retrying...") + sleep(1.second) + retry + else + @system_logger.warn("Bulk index failed after #{retries} attempts: #{e.message}.") + end + end end private @@ -102,7 +119,7 @@ def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics end end - @system_logger.debug("Errors found in bulk response. Full response: #{response}") + @system_logger.warn("Errors found in bulk response. Full response: #{response}") if first_error # TODO: add trace logging # TODO: consider logging all errors instead of just first diff --git a/spec/lib/crawler/output_sink/elasticsearch_spec.rb b/spec/lib/crawler/output_sink/elasticsearch_spec.rb index 72486db..c6a0c65 100644 --- a/spec/lib/crawler/output_sink/elasticsearch_spec.rb +++ b/spec/lib/crawler/output_sink/elasticsearch_spec.rb @@ -52,6 +52,7 @@ allow(system_logger).to receive(:debug) allow(system_logger).to receive(:info) + allow(system_logger).to receive(:warn) allow(Elasticsearch::API).to receive(:serializer).and_return(serializer) allow(serializer).to receive(:dump).and_return('') @@ -295,8 +296,8 @@ it 'returns empty stats' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(0) - expect(stats[:indexed_document_volume]).to eq(0) + expect(stats[:completed][:docs_count]).to eq(0) + expect(stats[:failed][:docs_volume]).to eq(0) end end @@ -311,8 +312,8 @@ it 'returns empty stats' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(0) - expect(stats[:indexed_document_volume]).to eq(0) + expect(stats[:completed][:docs_count]).to eq(0) + expect(stats[:failed][:docs_volume]).to eq(0) end end @@ -330,16 +331,48 @@ subject.flush end - it 'returns expected indexed_document_count' do + it 'returns expected docs_count' do stats = subject.ingestion_stats - expect(stats[:indexed_document_count]).to eq(document_count) + expect(stats[:completed][:docs_count]).to eq(document_count) + expect(stats[:failed][:docs_count]).to eq(0) end - it 'returns expected indexed_document_volume' do + it 'returns expected docs_volume' do stats = subject.ingestion_stats - expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:completed][:docs_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:failed][:docs_volume]).to eq(0) + end + end + + context 'when some documents failed to be ingested' do + let(:document_count) { 5 } + let(:serialized_object) { 'doesnt matter' } + + before(:each) do + allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize) + allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError) + + document_count.times.each do |x| + subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}")) + end + + subject.flush + end + + it 'returns expected docs_count' do + stats = subject.ingestion_stats + + expect(stats[:failed][:docs_count]).to eq(document_count) + expect(stats[:completed][:docs_count]).to eq(0) + end + + it 'returns expected docs_volume' do + stats = subject.ingestion_stats + + expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize) + expect(stats[:completed][:docs_volume]).to eq(0) end end end