From 665daeac683d2bfd2142e74610b545b5bce8b346 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 17 Jun 2024 09:23:09 +0000 Subject: [PATCH] [0.1] Lock bulk queue while processing indexing request (#45) (#50) Backports the following commits to 0.1: - Lock bulk queue while processing indexing request (#45) Co-authored-by: Navarone Feekery <13634519+navarone-feekery@users.noreply.github.com> --- .rubocop.yml | 3 + lib/crawler/api/crawl.rb | 9 +-- lib/crawler/coordinator.rb | 46 ++++++++----- lib/crawler/http_client.rb | 4 +- lib/crawler/http_utils/response.rb | 2 +- lib/crawler/output_sink/elasticsearch.rb | 39 ++++++----- lib/errors.rb | 21 ++++++ lib/utility/bulk_queue.rb | 13 +++- lib/utility/es_client.rb | 2 +- spec/lib/crawler/coordinator_spec.rb | 67 +++++++++++++------ .../crawler/output_sink/elasticsearch_spec.rb | 55 ++++++++++++--- spec/lib/utility/bulk_queue_spec.rb | 3 +- 12 files changed, 189 insertions(+), 75 deletions(-) create mode 100644 lib/errors.rb diff --git a/.rubocop.yml b/.rubocop.yml index c8c353a..fa7f677 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -24,6 +24,9 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 200 +Metrics/AbcSize: + Max: 20 + # Disable for specs Metrics/BlockLength: Exclude: diff --git a/lib/crawler/api/crawl.rb b/lib/crawler/api/crawl.rb index 2eb6e5d..c2e677d 100644 --- a/lib/crawler/api/crawl.rb +++ b/lib/crawler/api/crawl.rb @@ -20,7 +20,7 @@ class Crawl attr_reader :config, :crawl_queue, :seen_urls, :sink, :outcome, :outcome_message attr_accessor :executor - def initialize(config) # rubocop:disable Metrics/AbcSize + def initialize(config) raise ArgumentError, 'Invalid config' unless config.is_a?(Config) raise ArgumentError, 'Missing domain allowlist' if config.domain_allowlist.empty? raise ArgumentError, 'Seed URLs need to be an enumerator' unless config.seed_urls.is_a?(Enumerator) @@ -77,11 +77,6 @@ def interruptible_sleep(period) end end - # No errors should be retried by default (see App Search subclass for a version with retries) - def retryable_error?(_error) - false - end - #--------------------------------------------------------------------------------------------- def coordinator @coordinator ||= Crawler::Coordinator.new(self) @@ -124,7 +119,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength # Returns a hash with crawl-specific status information # Note: This is used by the `EventGenerator` class for crawl-status events and by the Crawler Status API. # Please update OpenAPI specs if you add any new fields here. - def status # rubocop:disable Metrics/AbcSize + def status { queue_size: crawl_queue.length, pages_visited: stats.fetched_pages_count, diff --git a/lib/crawler/coordinator.rb b/lib/crawler/coordinator.rb index 5a2484b..fe84303 100644 --- a/lib/crawler/coordinator.rb +++ b/lib/crawler/coordinator.rb @@ -10,6 +10,8 @@ require 'benchmark' require 'concurrent/set' +require_dependency File.join(__dir__, '..', 'errors') + # There are too many lint issues here to individually disable # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity module Crawler @@ -17,8 +19,8 @@ module Crawler class Coordinator # rubocop:disable Metrics/ClassLength SEED_LIST = 'seed-list' - # How long to wait before retrying ingestion after a retryable error (like a r/o mode write) - RETRY_INTERVAL = 10.seconds + SINK_LOCK_RETRY_INTERVAL = 1.second + SINK_LOCK_MAX_RETRIES = 120 attr_reader :crawl, :seen_urls, :crawl_outcome, :outcome_message, :started_at, :task_executors @@ -447,21 +449,35 @@ def extract_links(crawl_result, crawl_depth:) #----------------------------------------------------------------------------------------------- # Outputs the results of a single URL processing to an output module configured for the crawl def output_crawl_result(crawl_result) - sink.write(crawl_result).tap do |outcome| - # Make sure we have an outcome of the right type (helps troubleshoot sink implementations) - unless outcome.is_a?(Hash) - error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead" - raise ArgumentError, error + retries = 0 + begin + sink.write(crawl_result).tap do |outcome| + # Make sure we have an outcome of the right type (helps troubleshoot sink implementations) + unless outcome.is_a?(Hash) + error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead" + raise ArgumentError, error + end + end + rescue Errors::SinkLockedError + # Adding a debug log here is incredibly noisy, so instead we should rely on logging from the sink + # and only log here if the SINK_LOCK_MAX_RETRIES threshold is reached. + retries += 1 + unless retries >= SINK_LOCK_MAX_RETRIES + interruptible_sleep(SINK_LOCK_RETRY_INTERVAL) + retry end - end - rescue StandardError => e - if crawl.retryable_error?(e) && !shutdown_started? - system_logger.warn("Retryable error during content ingestion: #{e}. Going to retry in #{RETRY_INTERVAL}s...") - interruptible_sleep(RETRY_INTERVAL) - retry - end - sink.failure("Unexpected exception while sending crawl results to the output sink: #{e}") + log = <<~LOG.squish + Sink lock couldn't be acquired after #{retries} attempts, so crawl result for URL + [#{crawl_result.url}] was dropped. + LOG + system_logger.warn(log) + sink.failure(log) + rescue StandardError => e + log = "Unexpected exception while sending crawl results to the output sink: #{e}" + system_logger.fatal(log) + sink.failure(log) + end end #----------------------------------------------------------------------------------------------- diff --git a/lib/crawler/http_client.rb b/lib/crawler/http_client.rb index 1413b36..c11823f 100644 --- a/lib/crawler/http_client.rb +++ b/lib/crawler/http_client.rb @@ -197,7 +197,7 @@ def content_decoders end #------------------------------------------------------------------------------------------------- - def new_connection_manager # rubocop:disable Metrics/AbcSize + def new_connection_manager builder = PoolingHttpClientConnectionManagerBuilder.create builder.set_ssl_socket_factory(https_socket_factory) builder.set_dns_resolver(dns_resolver) @@ -315,7 +315,7 @@ def default_request_config #------------------------------------------------------------------------------------------------- # Returns a proxy host object to be used for all connections - def proxy_host # rubocop:disable Metrics/AbcSize + def proxy_host return nil unless config.http_proxy_host logger.debug(<<~LOG.squish) diff --git a/lib/crawler/http_utils/response.rb b/lib/crawler/http_utils/response.rb index e104945..a5100b2 100644 --- a/lib/crawler/http_utils/response.rb +++ b/lib/crawler/http_utils/response.rb @@ -72,7 +72,7 @@ def [](key) v.is_a?(Array) ? v.first : v end - def headers # rubocop:disable Metrics/AbcSize + def headers @headers ||= apache_response.headers.each_with_object({}) do |h, o| key = h.get_name.downcase diff --git a/lib/crawler/output_sink/elasticsearch.rb b/lib/crawler/output_sink/elasticsearch.rb index 2f01a6a..cce12d0 100644 --- a/lib/crawler/output_sink/elasticsearch.rb +++ b/lib/crawler/output_sink/elasticsearch.rb @@ -9,6 +9,7 @@ require_dependency File.join(__dir__, 'base') require_dependency File.join(__dir__, '..', '..', 'utility', 'es_client') require_dependency File.join(__dir__, '..', '..', 'utility', 'bulk_queue') +require_dependency File.join(__dir__, '..', '..', 'errors') module Crawler module OutputSink @@ -31,6 +32,7 @@ def initialize(config) # initialize client now to fail fast if config is bad client + @queue_lock = Mutex.new init_ingestion_stats system_logger.info( "Elasticsearch sink initialized for index [#{index_name}] with pipeline [#{pipeline}]" @@ -38,19 +40,26 @@ def initialize(config) end def write(crawl_result) - doc = parametrized_doc(crawl_result) - index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } } + # make additions to the operation queue thread-safe + raise Errors::SinkLockedError unless @queue_lock.try_lock - flush unless operation_queue.will_fit?(index_op, doc) + begin + doc = parametrized_doc(crawl_result) + index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } } - operation_queue.add( - index_op, - doc - ) - system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}") + flush unless operation_queue.will_fit?(index_op, doc) - increment_ingestion_stats(doc) - success + operation_queue.add( + index_op, + doc + ) + system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}") + + increment_ingestion_stats(doc) + success("Successfully added #{doc['id']} to the bulk queue") + ensure + @queue_lock.unlock + end end def close @@ -64,18 +73,18 @@ def close end def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - payload = operation_queue.pop_all - if payload.empty? + body = operation_queue.pop_all + if body.empty? system_logger.debug('Queue was empty when attempting to flush.') return end # a single doc needs two items in a bulk request, so halving the count makes logs clearer - indexing_docs_count = payload.size / 2 - system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...") + indexing_docs_count = body.size / 2 + system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...") begin - client.bulk(body: payload, pipeline:) # TODO: parse response + client.bulk(body:, pipeline:) # TODO: parse response system_logger.info("Successfully indexed #{indexing_docs_count} docs.") reset_ingestion_stats(true) rescue Utility::EsClient::IndexingFailedError => e diff --git a/lib/errors.rb b/lib/errors.rb new file mode 100644 index 0000000..fea18bd --- /dev/null +++ b/lib/errors.rb @@ -0,0 +1,21 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# + +# frozen_string_literal: true + +class Errors + # Raised only if the queue item added somehow overflows the queue threshold. + # The queue threshold is checked before an item is added so this error shouldn't occur. + # If this error occurs, something is wrong with the interaction between the Elasticsearch sink and BulkQueue. + class BulkQueueOverflowError < StandardError; end + + # Raised when attempting to add a crawl result to the sink, but it is currently locked. + # This is specific for Elasticsearch sink. Basically the sink is single-threaded but + # receives crawl results from multi-threaded processes. This error is raised to prevent + # overloading the queue if Elasticsearch indexing is failing repeatedly and performing + # exponential backoff. This error should be treated as retryable. + class SinkLockedError < StandardError; end +end diff --git a/lib/utility/bulk_queue.rb b/lib/utility/bulk_queue.rb index 0ba534d..1cd1aed 100644 --- a/lib/utility/bulk_queue.rb +++ b/lib/utility/bulk_queue.rb @@ -8,6 +8,8 @@ require 'elasticsearch/api' +require_dependency File.join(__dir__, '..', 'errors') + module Utility class BulkQueue # Maximum number of operations in BULK Elasticsearch operation that will ingest the data @@ -15,8 +17,6 @@ class BulkQueue # Maximum size of either whole BULK Elasticsearch operation or one document in it DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte - class QueueOverflowError < StandardError; end - def initialize(op_count_threshold, size_threshold, system_logger) @op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze @size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze @@ -41,7 +41,14 @@ def pop_all end def add(operation, payload = nil) - raise QueueOverflowError unless will_fit?(operation, payload) + unless will_fit?(operation, payload) + log = <<~LOG.squish + Operation failed to add to bulk queue. Current operation count is #{@current_op_count}. + Operation payload was #{bytesize(payload)} bytes, current buffer size is #{@current_buffer_size} bytes. + LOG + @system_logger.error(log) + raise Errors::BulkQueueOverflowError + end operation_size = bytesize(operation) payload_size = bytesize(payload) diff --git a/lib/utility/es_client.rb b/lib/utility/es_client.rb index b96025a..80e5871 100644 --- a/lib/utility/es_client.rb +++ b/lib/utility/es_client.rb @@ -13,7 +13,7 @@ module Utility class EsClient < ::Elasticsearch::Client USER_AGENT = 'elastic-web-crawler-' MAX_RETRIES = 3 - REQUEST_TIMEOUT = 30 # seconds + REQUEST_TIMEOUT = 10 # seconds FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to class IndexingFailedError < StandardError diff --git a/spec/lib/crawler/coordinator_spec.rb b/spec/lib/crawler/coordinator_spec.rb index 5ba5d89..62bc5dc 100644 --- a/spec/lib/crawler/coordinator_spec.rb +++ b/spec/lib/crawler/coordinator_spec.rb @@ -76,7 +76,7 @@ ) end - def process_crawl_result # rubocop:disable Metrics/AbcSize + def process_crawl_result allow(events).to receive(:url_output) allow(events).to receive(:url_discover) allow(events).to receive(:url_seed) @@ -125,7 +125,6 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize it 'should capture exceptions coming from the output module and generate a failure url-extracted event' do error = RuntimeError.new('BOOM') expect(crawl.sink).to receive(:write).and_raise(error) - expect(crawl).to receive(:retryable_error?).with(error).and_return(false) expect(events).to receive(:url_extracted).with( hash_including( url: crawl_result.url, @@ -140,31 +139,55 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize coordinator.send(:process_crawl_result, crawl_task, crawl_result) end - it 'should retry exceptions if possible' do - error = RuntimeError.new('BOOM') - expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args| - unless @called_before - @called_before = true - raise error + context 'when the output sink has a lock' do + before :each do + stub_const('Crawler::Coordinator::SINK_LOCK_MAX_RETRIES', 2) + end + + it 'should wait for the lock' do + # Sink locked on first call but open on second + expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args| + unless @called_before + @called_before = true + raise Errors::SinkLockedError + end + method.call(*args) end - method.call(*args) + + expect(crawl).to receive(:interruptible_sleep).once + expect(events).to receive(:url_extracted).with( + hash_including( + url: crawl_result.url, + type: :allowed, + start_time: kind_of(Time), + end_time: kind_of(Time), + duration: kind_of(Benchmark::Tms), + outcome: :success, + message: 'Successfully ingested crawl result' + ) + ) + coordinator.send(:process_crawl_result, crawl_task, crawl_result) end - expect(crawl).to receive(:retryable_error?).with(error).and_return(true) - expect(crawl).to receive(:interruptible_sleep) + it 'should fail if the lock acquisition times out' do + expect(crawl.sink).to receive(:write).twice.and_wrap_original do |_, *_| + raise Errors::SinkLockedError + end - expect(events).to receive(:url_extracted).with( - hash_including( - url: crawl_result.url, - type: :allowed, - start_time: kind_of(Time), - end_time: kind_of(Time), - duration: kind_of(Benchmark::Tms), - outcome: :success, - message: 'Successfully ingested crawl result' + expect(crawl).to receive(:interruptible_sleep).once + expect(events).to receive(:url_extracted).with( + hash_including( + url: crawl_result.url, + type: :allowed, + start_time: kind_of(Time), + end_time: kind_of(Time), + duration: kind_of(Benchmark::Tms), + outcome: :failure, + message: start_with("Sink lock couldn't be acquired after 2 attempts") + ) ) - ) - coordinator.send(:process_crawl_result, crawl_task, crawl_result) + coordinator.send(:process_crawl_result, crawl_task, crawl_result) + end end end diff --git a/spec/lib/crawler/output_sink/elasticsearch_spec.rb b/spec/lib/crawler/output_sink/elasticsearch_spec.rb index c6a0c65..5ebab12 100644 --- a/spec/lib/crawler/output_sink/elasticsearch_spec.rb +++ b/spec/lib/crawler/output_sink/elasticsearch_spec.rb @@ -15,7 +15,8 @@ output_sink: 'elasticsearch', output_index: index_name, elasticsearch: { - host: 'http://localhost:1234', + host: 'http://localhost', + port: 1234, api_key: 'key' } ) @@ -113,7 +114,8 @@ output_sink: 'elasticsearch', output_index: index_name, elasticsearch: { - host: 'http://localhost:1234', + host: 'http://localhost', + port: 1234, api_key: 'key', pipeline: 'my-pipeline' } @@ -137,7 +139,8 @@ output_sink: 'elasticsearch', output_index: index_name, elasticsearch: { - host: 'http://localhost:1234', + host: 'http://localhost', + port: 1234, api_key: 'key', pipeline: 'my-pipeline', pipeline_params: { @@ -171,7 +174,8 @@ output_sink: 'elasticsearch', output_index: index_name, elasticsearch: { - host: 'http://localhost:1234', + host: 'http://localhost', + port: 1234, api_key: 'key', pipeline_enabled: false } @@ -258,19 +262,38 @@ subject.write(crawl_result_two) end - it 'pops existing documents before adding a new one' do + it 'blocks simultaneous threads with locking' do + # this will call write 3 times + # first call will take the lock and add crawl_result_one's doc + # second call will be rejected due to locking + # third call (retry of second call) will take the lock and add crawl_result_two's doc + # we can test this by using `ordered` on the spies expect(bulk_queue).to receive(:add).with(anything, hash_including(doc_one)).ordered expect(bulk_queue).to receive(:pop_all).ordered expect(bulk_queue).to receive(:add).with(anything, hash_including(doc_two)).ordered - subject.write(crawl_result_one) + # initially send multi-threaded to engage lock + threads = [crawl_result_one, crawl_result_two].map do |crawl_result| + Thread.new do + subject.write(crawl_result) + end + end + # second call will fail, but we can't differentiate that here + expect { threads.each(&:join) }.to raise_error(Errors::SinkLockedError) + + # mock reattempting after failed lock acquisition subject.write(crawl_result_two) end end end describe '#flush' do - let(:operation) { 'bulk: delete something \n insert something else' } + let(:operation) do + [ + { index: { _index: 'my-index', _id: '1234' } }, + { id: '202d2df297ed4e62b51dff33ee1418330a93a622', title: 'foo' } + ] + end before(:each) do allow(bulk_queue).to receive(:pop_all).and_return(operation) @@ -278,9 +301,23 @@ it 'sends data from bulk queue to elasticsearch' do expect(es_client).to receive(:bulk).with(hash_including(body: operation, pipeline: default_pipeline)) + expect(system_logger).to receive(:info).with('Successfully indexed 1 docs.') subject.flush end + + context('when an error occurs during indexing') do + before(:each) do + allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError.new('BOOM')) + end + + it 'logs error' do + expect(es_client).to receive(:bulk).with(hash_including(body: operation, pipeline: default_pipeline)) + expect(system_logger).to receive(:warn).with('Bulk index failed: BOOM') + + subject.flush + end + end end describe '#ingestion_stats' do @@ -352,7 +389,7 @@ before(:each) do allow(bulk_queue).to receive(:bytesize).and_return(serialized_object.bytesize) - allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError) + allow(es_client).to receive(:bulk).and_raise(Utility::EsClient::IndexingFailedError.new('BOOM')) document_count.times.each do |x| subject.write(FactoryBot.build(:html_crawl_result, url: "http://real.com/#{x}")) @@ -366,6 +403,7 @@ expect(stats[:failed][:docs_count]).to eq(document_count) expect(stats[:completed][:docs_count]).to eq(0) + expect(system_logger).to have_received(:warn).with('Bulk index failed: BOOM') end it 'returns expected docs_volume' do @@ -373,6 +411,7 @@ expect(stats[:failed][:docs_volume]).to eq(document_count * serialized_object.bytesize) expect(stats[:completed][:docs_volume]).to eq(0) + expect(system_logger).to have_received(:warn).with('Bulk index failed: BOOM') end end end diff --git a/spec/lib/utility/bulk_queue_spec.rb b/spec/lib/utility/bulk_queue_spec.rb index b152dbf..24c3600 100644 --- a/spec/lib/utility/bulk_queue_spec.rb +++ b/spec/lib/utility/bulk_queue_spec.rb @@ -14,6 +14,7 @@ before(:each) do allow(system_logger).to receive(:debug) + allow(system_logger).to receive(:error) end describe '#add' do @@ -29,7 +30,7 @@ end it 'raises an error' do - expect { subject.add('some-op') }.to raise_error(Utility::BulkQueue::QueueOverflowError) + expect { subject.add('some-op') }.to raise_error(Errors::BulkQueueOverflowError) end end