Skip to content

Commit

Permalink
Lock bulk queue while processing indexing request (#45)
Browse files Browse the repository at this point in the history
### Closes #42

Add  a mutex lock to the `write` method in the
`Elasticsearch` sink. If `write` is called, the lock is enabled, and
nothing can be added to the pool until the lock is lifted.
  • Loading branch information
navarone-feekery authored and elastic committed Jun 17, 2024
1 parent ae66978 commit fcf84b1
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Metrics/ClassLength:
Metrics/ModuleLength:
Max: 200

Metrics/AbcSize:
Max: 20

# Disable for specs
Metrics/BlockLength:
Exclude:
Expand Down
9 changes: 2 additions & 7 deletions lib/crawler/api/crawl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Crawl
attr_reader :config, :crawl_queue, :seen_urls, :sink, :outcome, :outcome_message
attr_accessor :executor

def initialize(config) # rubocop:disable Metrics/AbcSize
def initialize(config)
raise ArgumentError, 'Invalid config' unless config.is_a?(Config)
raise ArgumentError, 'Missing domain allowlist' if config.domain_allowlist.empty?
raise ArgumentError, 'Seed URLs need to be an enumerator' unless config.seed_urls.is_a?(Enumerator)
Expand Down Expand Up @@ -77,11 +77,6 @@ def interruptible_sleep(period)
end
end

# No errors should be retried by default (see App Search subclass for a version with retries)
def retryable_error?(_error)
false
end

#---------------------------------------------------------------------------------------------
def coordinator
@coordinator ||= Crawler::Coordinator.new(self)
Expand Down Expand Up @@ -124,7 +119,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
# Returns a hash with crawl-specific status information
# Note: This is used by the `EventGenerator` class for crawl-status events and by the Crawler Status API.
# Please update OpenAPI specs if you add any new fields here.
def status # rubocop:disable Metrics/AbcSize
def status
{
queue_size: crawl_queue.length,
pages_visited: stats.fetched_pages_count,
Expand Down
46 changes: 31 additions & 15 deletions lib/crawler/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@
require 'benchmark'
require 'concurrent/set'

require_dependency File.join(__dir__, '..', 'errors')

# There are too many lint issues here to individually disable
# rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity
module Crawler
# The Coordinator is responsible for running an entire crawl from start to finish.
class Coordinator # rubocop:disable Metrics/ClassLength
SEED_LIST = 'seed-list'

# How long to wait before retrying ingestion after a retryable error (like a r/o mode write)
RETRY_INTERVAL = 10.seconds
SINK_LOCK_RETRY_INTERVAL = 1.second
SINK_LOCK_MAX_RETRIES = 120

attr_reader :crawl, :seen_urls, :crawl_outcome, :outcome_message, :started_at, :task_executors

Expand Down Expand Up @@ -447,21 +449,35 @@ def extract_links(crawl_result, crawl_depth:)
#-----------------------------------------------------------------------------------------------
# Outputs the results of a single URL processing to an output module configured for the crawl
def output_crawl_result(crawl_result)
sink.write(crawl_result).tap do |outcome|
# Make sure we have an outcome of the right type (helps troubleshoot sink implementations)
unless outcome.is_a?(Hash)
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead"
raise ArgumentError, error
retries = 0
begin
sink.write(crawl_result).tap do |outcome|
# Make sure we have an outcome of the right type (helps troubleshoot sink implementations)
unless outcome.is_a?(Hash)
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead"
raise ArgumentError, error
end
end
rescue Errors::SinkLockedError
# Adding a debug log here is incredibly noisy, so instead we should rely on logging from the sink
# and only log here if the SINK_LOCK_MAX_RETRIES threshold is reached.
retries += 1
unless retries >= SINK_LOCK_MAX_RETRIES
interruptible_sleep(SINK_LOCK_RETRY_INTERVAL)
retry
end
end
rescue StandardError => e
if crawl.retryable_error?(e) && !shutdown_started?
system_logger.warn("Retryable error during content ingestion: #{e}. Going to retry in #{RETRY_INTERVAL}s...")
interruptible_sleep(RETRY_INTERVAL)
retry
end

sink.failure("Unexpected exception while sending crawl results to the output sink: #{e}")
log = <<~LOG.squish
Sink lock couldn't be acquired after #{retries} attempts, so crawl result for URL
[#{crawl_result.url}] was dropped.
LOG
system_logger.warn(log)
sink.failure(log)
rescue StandardError => e
log = "Unexpected exception while sending crawl results to the output sink: #{e}"
system_logger.fatal(log)
sink.failure(log)
end
end

#-----------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions lib/crawler/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def content_decoders
end

#-------------------------------------------------------------------------------------------------
def new_connection_manager # rubocop:disable Metrics/AbcSize
def new_connection_manager
builder = PoolingHttpClientConnectionManagerBuilder.create
builder.set_ssl_socket_factory(https_socket_factory)
builder.set_dns_resolver(dns_resolver)
Expand Down Expand Up @@ -315,7 +315,7 @@ def default_request_config

#-------------------------------------------------------------------------------------------------
# Returns a proxy host object to be used for all connections
def proxy_host # rubocop:disable Metrics/AbcSize
def proxy_host
return nil unless config.http_proxy_host

logger.debug(<<~LOG.squish)
Expand Down
2 changes: 1 addition & 1 deletion lib/crawler/http_utils/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def [](key)
v.is_a?(Array) ? v.first : v
end

def headers # rubocop:disable Metrics/AbcSize
def headers
@headers ||= apache_response.headers.each_with_object({}) do |h, o|
key = h.get_name.downcase

Expand Down
39 changes: 24 additions & 15 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_dependency File.join(__dir__, 'base')
require_dependency File.join(__dir__, '..', '..', 'utility', 'es_client')
require_dependency File.join(__dir__, '..', '..', 'utility', 'bulk_queue')
require_dependency File.join(__dir__, '..', '..', 'errors')

module Crawler
module OutputSink
Expand All @@ -31,26 +32,34 @@ def initialize(config)
# initialize client now to fail fast if config is bad
client

@queue_lock = Mutex.new
init_ingestion_stats
system_logger.info(
"Elasticsearch sink initialized for index [#{index_name}] with pipeline [#{pipeline}]"
)
end

def write(crawl_result)
doc = parametrized_doc(crawl_result)
index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } }
# make additions to the operation queue thread-safe
raise Errors::SinkLockedError unless @queue_lock.try_lock

flush unless operation_queue.will_fit?(index_op, doc)
begin
doc = parametrized_doc(crawl_result)
index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } }

operation_queue.add(
index_op,
doc
)
system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}")
flush unless operation_queue.will_fit?(index_op, doc)

increment_ingestion_stats(doc)
success
operation_queue.add(
index_op,
doc
)
system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}")

increment_ingestion_stats(doc)
success("Successfully added #{doc['id']} to the bulk queue")
ensure
@queue_lock.unlock
end
end

def close
Expand All @@ -64,18 +73,18 @@ def close
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
payload = operation_queue.pop_all
if payload.empty?
body = operation_queue.pop_all
if body.empty?
system_logger.debug('Queue was empty when attempting to flush.')
return
end

# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = payload.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")
indexing_docs_count = body.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...")

begin
client.bulk(body: payload, pipeline:) # TODO: parse response
client.bulk(body:, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue Utility::EsClient::IndexingFailedError => e
Expand Down
21 changes: 21 additions & 0 deletions lib/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

# frozen_string_literal: true

class Errors
# Raised only if the queue item added somehow overflows the queue threshold.
# The queue threshold is checked before an item is added so this error shouldn't occur.
# If this error occurs, something is wrong with the interaction between the Elasticsearch sink and BulkQueue.
class BulkQueueOverflowError < StandardError; end

# Raised when attempting to add a crawl result to the sink, but it is currently locked.
# This is specific for Elasticsearch sink. Basically the sink is single-threaded but
# receives crawl results from multi-threaded processes. This error is raised to prevent
# overloading the queue if Elasticsearch indexing is failing repeatedly and performing
# exponential backoff. This error should be treated as retryable.
class SinkLockedError < StandardError; end
end
13 changes: 10 additions & 3 deletions lib/utility/bulk_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

require 'elasticsearch/api'

require_dependency File.join(__dir__, '..', 'errors')

module Utility
class BulkQueue
# Maximum number of operations in BULK Elasticsearch operation that will ingest the data
DEFAULT_OP_COUNT_THRESHOLD = 100
# Maximum size of either whole BULK Elasticsearch operation or one document in it
DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte

class QueueOverflowError < StandardError; end

def initialize(op_count_threshold, size_threshold, system_logger)
@op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze
@size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze
Expand All @@ -41,7 +41,14 @@ def pop_all
end

def add(operation, payload = nil)
raise QueueOverflowError unless will_fit?(operation, payload)
unless will_fit?(operation, payload)
log = <<~LOG.squish
Operation failed to add to bulk queue. Current operation count is #{@current_op_count}.
Operation payload was #{bytesize(payload)} bytes, current buffer size is #{@current_buffer_size} bytes.
LOG
@system_logger.error(log)
raise Errors::BulkQueueOverflowError
end

operation_size = bytesize(operation)
payload_size = bytesize(payload)
Expand Down
2 changes: 1 addition & 1 deletion lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds
REQUEST_TIMEOUT = 10 # seconds
FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to

class IndexingFailedError < StandardError
Expand Down
67 changes: 45 additions & 22 deletions spec/lib/crawler/coordinator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
)
end

def process_crawl_result # rubocop:disable Metrics/AbcSize
def process_crawl_result
allow(events).to receive(:url_output)
allow(events).to receive(:url_discover)
allow(events).to receive(:url_seed)
Expand Down Expand Up @@ -125,7 +125,6 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
it 'should capture exceptions coming from the output module and generate a failure url-extracted event' do
error = RuntimeError.new('BOOM')
expect(crawl.sink).to receive(:write).and_raise(error)
expect(crawl).to receive(:retryable_error?).with(error).and_return(false)
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
Expand All @@ -140,31 +139,55 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end

it 'should retry exceptions if possible' do
error = RuntimeError.new('BOOM')
expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args|
unless @called_before
@called_before = true
raise error
context 'when the output sink has a lock' do
before :each do
stub_const('Crawler::Coordinator::SINK_LOCK_MAX_RETRIES', 2)
end

it 'should wait for the lock' do
# Sink locked on first call but open on second
expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args|
unless @called_before
@called_before = true
raise Errors::SinkLockedError
end
method.call(*args)
end
method.call(*args)

expect(crawl).to receive(:interruptible_sleep).once
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :success,
message: 'Successfully ingested crawl result'
)
)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end

expect(crawl).to receive(:retryable_error?).with(error).and_return(true)
expect(crawl).to receive(:interruptible_sleep)
it 'should fail if the lock acquisition times out' do
expect(crawl.sink).to receive(:write).twice.and_wrap_original do |_, *_|
raise Errors::SinkLockedError
end

expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :success,
message: 'Successfully ingested crawl result'
expect(crawl).to receive(:interruptible_sleep).once
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :failure,
message: start_with("Sink lock couldn't be acquired after 2 attempts")
)
)
)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end
end
end

Expand Down
Loading

0 comments on commit fcf84b1

Please sign in to comment.