Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock bulk queue while processing indexing request #45

Merged
merged 7 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Metrics/ClassLength:
Metrics/ModuleLength:
Max: 200

Metrics/AbcSize:
Max: 20

# Disable for specs
Metrics/BlockLength:
Exclude:
Expand Down
9 changes: 2 additions & 7 deletions lib/crawler/api/crawl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Crawl
attr_reader :config, :crawl_queue, :seen_urls, :sink, :outcome, :outcome_message
attr_accessor :executor

def initialize(config) # rubocop:disable Metrics/AbcSize
def initialize(config)
raise ArgumentError, 'Invalid config' unless config.is_a?(Config)
raise ArgumentError, 'Missing domain allowlist' if config.domain_allowlist.empty?
raise ArgumentError, 'Seed URLs need to be an enumerator' unless config.seed_urls.is_a?(Enumerator)
Expand Down Expand Up @@ -77,11 +77,6 @@ def interruptible_sleep(period)
end
end

# No errors should be retried by default (see App Search subclass for a version with retries)
def retryable_error?(_error)
false
end

#---------------------------------------------------------------------------------------------
def coordinator
@coordinator ||= Crawler::Coordinator.new(self)
Expand Down Expand Up @@ -124,7 +119,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
# Returns a hash with crawl-specific status information
# Note: This is used by the `EventGenerator` class for crawl-status events and by the Crawler Status API.
# Please update OpenAPI specs if you add any new fields here.
def status # rubocop:disable Metrics/AbcSize
def status
{
queue_size: crawl_queue.length,
pages_visited: stats.fetched_pages_count,
Expand Down
16 changes: 10 additions & 6 deletions lib/crawler/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
require 'benchmark'
require 'concurrent/set'

require_dependency File.join(__dir__, '..', 'errors')

# There are too many lint issues here to individually disable
# rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity
module Crawler
Expand Down Expand Up @@ -454,13 +456,15 @@ def output_crawl_result(crawl_result)
raise ArgumentError, error
end
end
rescue Errors::BulkQueueLockedError
log = <<~LOG.squish
Crawl result could not be added to queue because an indexing operation is in process.
Retrying in #{RETRY_INTERVAL} seconds...
LOG
system_logger.debug(log)
interruptible_sleep(RETRY_INTERVAL)
retry
rescue StandardError => e
if crawl.retryable_error?(e) && !shutdown_started?
system_logger.warn("Retryable error during content ingestion: #{e}. Going to retry in #{RETRY_INTERVAL}s...")
interruptible_sleep(RETRY_INTERVAL)
retry
end

sink.failure("Unexpected exception while sending crawl results to the output sink: #{e}")
end

Expand Down
4 changes: 2 additions & 2 deletions lib/crawler/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def content_decoders
end

#-------------------------------------------------------------------------------------------------
def new_connection_manager # rubocop:disable Metrics/AbcSize
def new_connection_manager
builder = PoolingHttpClientConnectionManagerBuilder.create
builder.set_ssl_socket_factory(https_socket_factory)
builder.set_dns_resolver(dns_resolver)
Expand Down Expand Up @@ -315,7 +315,7 @@ def default_request_config

#-------------------------------------------------------------------------------------------------
# Returns a proxy host object to be used for all connections
def proxy_host # rubocop:disable Metrics/AbcSize
def proxy_host
return nil unless config.http_proxy_host

logger.debug(<<~LOG.squish)
Expand Down
2 changes: 1 addition & 1 deletion lib/crawler/http_utils/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def [](key)
v.is_a?(Array) ? v.first : v
end

def headers # rubocop:disable Metrics/AbcSize
def headers
@headers ||= apache_response.headers.each_with_object({}) do |h, o|
key = h.get_name.downcase

Expand Down
37 changes: 27 additions & 10 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_dependency File.join(__dir__, 'base')
require_dependency File.join(__dir__, '..', '..', 'utility', 'es_client')
require_dependency File.join(__dir__, '..', '..', 'utility', 'bulk_queue')
require_dependency File.join(__dir__, '..', '..', 'errors')

module Crawler
module OutputSink
Expand All @@ -31,17 +32,21 @@ def initialize(config)
# initialize client now to fail fast if config is bad
client

@queue_locked = false
init_ingestion_stats
system_logger.info(
"Elasticsearch sink initialized for index [#{index_name}] with pipeline [#{pipeline}]"
)
end

def write(crawl_result)
# prevent overloading the bulk queue
raise Errors::BulkQueueLockedError if @queue_locked

doc = parametrized_doc(crawl_result)
index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } }

flush unless operation_queue.will_fit?(index_op, doc)
process unless operation_queue.will_fit?(index_op, doc)

operation_queue.add(
index_op,
Expand All @@ -50,11 +55,11 @@ def write(crawl_result)
system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}")

increment_ingestion_stats(doc)
success
success("Successfully added #{doc['id']} to the bulk queue")
end

def close
flush
process
msg = <<~LOG.squish
All indexing operations completed.
Successfully indexed #{@completed[:docs_count]} docs with a volume of #{@completed[:docs_volume]} bytes.
Expand All @@ -63,19 +68,21 @@ def close
system_logger.info(msg)
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
payload = operation_queue.pop_all
if payload.empty?
system_logger.debug('Queue was empty when attempting to flush.')
def process # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
lock_queue

body = operation_queue.pop_all
if body.empty?
system_logger.debug('Queue was empty when attempting to process.')
return
end

# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = payload.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")
indexing_docs_count = body.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...")

begin
client.bulk(body: payload, pipeline:) # TODO: parse response
client.bulk(body:, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue Utility::EsClient::IndexingFailedError => e
Expand All @@ -84,6 +91,8 @@ def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
rescue StandardError => e
system_logger.warn("Bulk index failed for unexpected reason: #{e}")
reset_ingestion_stats(false)
ensure
unlock_queue
end
end

Expand Down Expand Up @@ -128,6 +137,14 @@ def pipeline_params
@pipeline_params ||= DEFAULT_PIPELINE_PARAMS.merge(es_config[:pipeline_params] || {}).deep_stringify_keys
end

def lock_queue
@queue_locked = true
end

def unlock_queue
@queue_locked = false
end

private

def parametrized_doc(crawl_result)
Expand Down
21 changes: 21 additions & 0 deletions lib/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

# frozen_string_literal: true

class Errors
# Raised when attempting to add a crawl result to the bulk queue, but it is currently flushing.
# The bulk queue is single-threaded, but receives crawl results from multi-threaded processes.
# This error is raised to prevent overloading the queue if Elasticsearch indexing is failing repeatedly
# and performing exponential backoff.
# This error should be treated as retryable.
class BulkQueueLockedError < StandardError; end

# Raised only if the queue item added somehow overflows the queue threshold.
# The queue threshold is checked before an item is added so this error shouldn't occur.
# If this error occurs, something is wrong with the interaction between the Elasticsearch sink and BulkQueue.
class BulkQueueOverflowError < StandardError; end
end
6 changes: 3 additions & 3 deletions lib/utility/bulk_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

require 'elasticsearch/api'

require_dependency File.join(__dir__, '..', 'errors')

module Utility
class BulkQueue
# Maximum number of operations in BULK Elasticsearch operation that will ingest the data
DEFAULT_OP_COUNT_THRESHOLD = 100
# Maximum size of either whole BULK Elasticsearch operation or one document in it
DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte

class QueueOverflowError < StandardError; end

def initialize(op_count_threshold, size_threshold, system_logger)
@op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze
@size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze
Expand All @@ -41,7 +41,7 @@ def pop_all
end

def add(operation, payload = nil)
raise QueueOverflowError unless will_fit?(operation, payload)
raise Errors::BulkQueueOverflowError unless will_fit?(operation, payload)

operation_size = bytesize(operation)
payload_size = bytesize(payload)
Expand Down
7 changes: 2 additions & 5 deletions spec/lib/crawler/coordinator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
)
end

def process_crawl_result # rubocop:disable Metrics/AbcSize
def process_crawl_result
allow(events).to receive(:url_output)
allow(events).to receive(:url_discover)
allow(events).to receive(:url_seed)
Expand Down Expand Up @@ -125,7 +125,6 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
it 'should capture exceptions coming from the output module and generate a failure url-extracted event' do
error = RuntimeError.new('BOOM')
expect(crawl.sink).to receive(:write).and_raise(error)
expect(crawl).to receive(:retryable_error?).with(error).and_return(false)
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
Expand All @@ -141,7 +140,7 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
end

it 'should retry exceptions if possible' do
error = RuntimeError.new('BOOM')
error = Errors::BulkQueueLockedError.new
expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args|
unless @called_before
@called_before = true
Expand All @@ -150,9 +149,7 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
method.call(*args)
end

expect(crawl).to receive(:retryable_error?).with(error).and_return(true)
expect(crawl).to receive(:interruptible_sleep)

expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
Expand Down
Loading