Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock bulk queue while processing indexing request #45

Merged
merged 7 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Metrics/ClassLength:
Metrics/ModuleLength:
Max: 200

Metrics/AbcSize:
Max: 20

# Disable for specs
Metrics/BlockLength:
Exclude:
Expand Down
9 changes: 2 additions & 7 deletions lib/crawler/api/crawl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Crawl
attr_reader :config, :crawl_queue, :seen_urls, :sink, :outcome, :outcome_message
attr_accessor :executor

def initialize(config) # rubocop:disable Metrics/AbcSize
def initialize(config)
raise ArgumentError, 'Invalid config' unless config.is_a?(Config)
raise ArgumentError, 'Missing domain allowlist' if config.domain_allowlist.empty?
raise ArgumentError, 'Seed URLs need to be an enumerator' unless config.seed_urls.is_a?(Enumerator)
Expand Down Expand Up @@ -77,11 +77,6 @@ def interruptible_sleep(period)
end
end

# No errors should be retried by default (see App Search subclass for a version with retries)
def retryable_error?(_error)
false
end

#---------------------------------------------------------------------------------------------
def coordinator
@coordinator ||= Crawler::Coordinator.new(self)
Expand Down Expand Up @@ -124,7 +119,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
# Returns a hash with crawl-specific status information
# Note: This is used by the `EventGenerator` class for crawl-status events and by the Crawler Status API.
# Please update OpenAPI specs if you add any new fields here.
def status # rubocop:disable Metrics/AbcSize
def status
{
queue_size: crawl_queue.length,
pages_visited: stats.fetched_pages_count,
Expand Down
32 changes: 17 additions & 15 deletions lib/crawler/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
require 'benchmark'
require 'concurrent/set'

require_dependency File.join(__dir__, '..', 'errors')

# There are too many lint issues here to individually disable
# rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity
module Crawler
# The Coordinator is responsible for running an entire crawl from start to finish.
class Coordinator # rubocop:disable Metrics/ClassLength
SEED_LIST = 'seed-list'

# How long to wait before retrying ingestion after a retryable error (like a r/o mode write)
RETRY_INTERVAL = 10.seconds
SINK_LOCK_TIMEOUT = 60 # seconds

attr_reader :crawl, :seen_urls, :crawl_outcome, :outcome_message, :started_at, :task_executors

Expand Down Expand Up @@ -447,21 +447,23 @@ def extract_links(crawl_result, crawl_depth:)
#-----------------------------------------------------------------------------------------------
# Outputs the results of a single URL processing to an output module configured for the crawl
def output_crawl_result(crawl_result)
sink.write(crawl_result).tap do |outcome|
# Make sure we have an outcome of the right type (helps troubleshoot sink implementations)
unless outcome.is_a?(Hash)
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead"
raise ArgumentError, error
Timeout.timeout(SINK_LOCK_TIMEOUT) do
Copy link
Member

@artem-shelkovnikov artem-shelkovnikov Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - how does SINK_LOCK_TIMEOUT interact with retries for sink? Can it timeout before sink finishes retrying?

Copy link
Collaborator Author

@navarone-feekery navarone-feekery Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout is built-in ruby and it appears it wouldn't be thread-safe there 🤦🏻 sorry I never thought something built-in could be so bad, it seems everywhere recommends not to use it.

Also - how does SINK_LOCK_TIMEOUT interact with retries for sink? Can it timeout before sink finishes retrying?

There's no interaction, it's just a flat timeout.
There are two retries that would happen in sink.

  • First is acquiring the lock, which isn't really a timeout, mutex.synchronize just waits for the lock to be lifted.
  • Second is the flush (ES request) retries. In a worst-case scenario of 4 attempts each timing out at 10 seconds, it would take around 54 seconds (10 + 12 + 14 + 18) to release the lock. If the flush fails the entire payload will be dropped so the next executor that acquires the lock won't reattempt this request, it'll add its crawl result to the now-empty queue. This means the executors waiting for the lock for these 54ish seconds should clear up quickly afterwards.

If we remove Timeout I think this could be reworked to use a mutex.try_lock > mutex.unlock block instead of mutex.synchronize. If the lock can't be acquired it sleeps for 1 second, up to a max of maybe 120 retries (so approx. two minutes), before throwing an error. How does that sound?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it happens :)

I think indeed using mutex is best. A question raised in my head while reading through it - what happens if Elasticsearch is overloaded and start throttling?

So you have 10 threads that are getting page content and throwing it into single sink. Sink starts to slow down, but do threads keep extracting content? What happens if your timeout it 120 seconds, but sink took 118 seconds to unlock, will all threads waiting for it be able to write into it anyway?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you have 10 threads that are getting page content and throwing it into single sink. Sink starts to slow down, but do threads keep extracting content?

No, content extraction should pause during this for all threads. The executors are idling waiting for the lock.

What happens if your timeout it 120 seconds, but sink took 118 seconds to unlock, will all threads waiting for it be able to write into it anyway?

If the remaining executors can do write everything in 2 seconds, yes. The write is usually very fast if there's no flush required, so I think most would go through in this case, but there's always a possibility of some being dropped.
I think that's unavoidable, and as long as it's logged clearly users can do something about it (e.g. look at why Elasticsearch throttles the Crawler).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artem-shelkovnikov I've changed things a bit so Timeout isn't used, I thought it made more sense to go by lock acquisition attempts instead (this also means if a thread acquires a lock late and takes a long time itself, it won't be unnecessarily terminated early).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken a look and haven't found anything broken with this approach. I'm also not super good with concurrent operations, so some stuff might go south regardless, this is something that's just worth testing at some point (Like ingest a large site into a super small Elasticsearch instance and observe), that can of course be done later when you feel it's the right moment!

sink.write(crawl_result).tap do |outcome|
# Make sure we have an outcome of the right type (helps troubleshoot sink implementations)
unless outcome.is_a?(Hash)
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead"
raise ArgumentError, error
end
end
end
rescue Timeout::Error
log = "Executor waited #{SINK_LOCK_TIMEOUT} seconds for output sink lock but timed out."
system_logger.error(log)
sink.failure(log)
rescue StandardError => e
if crawl.retryable_error?(e) && !shutdown_started?
system_logger.warn("Retryable error during content ingestion: #{e}. Going to retry in #{RETRY_INTERVAL}s...")
interruptible_sleep(RETRY_INTERVAL)
retry
end

sink.failure("Unexpected exception while sending crawl results to the output sink: #{e}")
log = "Unexpected exception while sending crawl results to the output sink: #{e}"
system_logger.fatal(log)
sink.failure(log)
end

#-----------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions lib/crawler/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def content_decoders
end

#-------------------------------------------------------------------------------------------------
def new_connection_manager # rubocop:disable Metrics/AbcSize
def new_connection_manager
builder = PoolingHttpClientConnectionManagerBuilder.create
builder.set_ssl_socket_factory(https_socket_factory)
builder.set_dns_resolver(dns_resolver)
Expand Down Expand Up @@ -315,7 +315,7 @@ def default_request_config

#-------------------------------------------------------------------------------------------------
# Returns a proxy host object to be used for all connections
def proxy_host # rubocop:disable Metrics/AbcSize
def proxy_host
return nil unless config.http_proxy_host

logger.debug(<<~LOG.squish)
Expand Down
2 changes: 1 addition & 1 deletion lib/crawler/http_utils/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def [](key)
v.is_a?(Array) ? v.first : v
end

def headers # rubocop:disable Metrics/AbcSize
def headers
@headers ||= apache_response.headers.each_with_object({}) do |h, o|
key = h.get_name.downcase

Expand Down
35 changes: 20 additions & 15 deletions lib/crawler/output_sink/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_dependency File.join(__dir__, 'base')
require_dependency File.join(__dir__, '..', '..', 'utility', 'es_client')
require_dependency File.join(__dir__, '..', '..', 'utility', 'bulk_queue')
require_dependency File.join(__dir__, '..', '..', 'errors')

module Crawler
module OutputSink
Expand All @@ -31,26 +32,30 @@ def initialize(config)
# initialize client now to fail fast if config is bad
client

@queue_lock = Mutex.new
init_ingestion_stats
system_logger.info(
"Elasticsearch sink initialized for index [#{index_name}] with pipeline [#{pipeline}]"
)
end

def write(crawl_result)
doc = parametrized_doc(crawl_result)
index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } }
# make additions to the bulk queue thread-safe
@queue_lock.synchronize do
doc = parametrized_doc(crawl_result)
index_op = { 'index' => { '_index' => index_name, '_id' => doc['id'] } }

flush unless operation_queue.will_fit?(index_op, doc)
flush unless operation_queue.will_fit?(index_op, doc)

operation_queue.add(
index_op,
doc
)
system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}")
operation_queue.add(
index_op,
doc
)
system_logger.debug("Added doc #{doc['id']} to bulk queue. Current stats: #{operation_queue.current_stats}")

increment_ingestion_stats(doc)
success
increment_ingestion_stats(doc)
success("Successfully added #{doc['id']} to the bulk queue")
end
end

def close
Expand All @@ -64,18 +69,18 @@ def close
end

def flush # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
payload = operation_queue.pop_all
if payload.empty?
body = operation_queue.pop_all
if body.empty?
system_logger.debug('Queue was empty when attempting to flush.')
return
end

# a single doc needs two items in a bulk request, so halving the count makes logs clearer
indexing_docs_count = payload.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and flushing queue...")
indexing_docs_count = body.size / 2
system_logger.info("Sending bulk request with #{indexing_docs_count} items and resetting queue...")

begin
client.bulk(body: payload, pipeline:) # TODO: parse response
client.bulk(body:, pipeline:) # TODO: parse response
system_logger.info("Successfully indexed #{indexing_docs_count} docs.")
reset_ingestion_stats(true)
rescue Utility::EsClient::IndexingFailedError => e
Expand Down
14 changes: 14 additions & 0 deletions lib/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

# frozen_string_literal: true

class Errors
# Raised only if the queue item added somehow overflows the queue threshold.
# The queue threshold is checked before an item is added so this error shouldn't occur.
# If this error occurs, something is wrong with the interaction between the Elasticsearch sink and BulkQueue.
class BulkQueueOverflowError < StandardError; end
end
13 changes: 10 additions & 3 deletions lib/utility/bulk_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

require 'elasticsearch/api'

require_dependency File.join(__dir__, '..', 'errors')

module Utility
class BulkQueue
# Maximum number of operations in BULK Elasticsearch operation that will ingest the data
DEFAULT_OP_COUNT_THRESHOLD = 100
# Maximum size of either whole BULK Elasticsearch operation or one document in it
DEFAULT_SIZE_THRESHOLD = 1 * 1024 * 1024 # 1 megabyte

class QueueOverflowError < StandardError; end

def initialize(op_count_threshold, size_threshold, system_logger)
@op_count_threshold = (op_count_threshold || DEFAULT_OP_COUNT_THRESHOLD).freeze
@size_threshold = (size_threshold || DEFAULT_SIZE_THRESHOLD).freeze
Expand All @@ -41,7 +41,14 @@ def pop_all
end

def add(operation, payload = nil)
raise QueueOverflowError unless will_fit?(operation, payload)
unless will_fit?(operation, payload)
log = <<~LOG.squish
Operation failed to add to bulk queue. Current operation count is #{@current_op_count}.
Operation payload was #{bytesize(payload)} bytes, current buffer size is #{@current_buffer_size} bytes.
LOG
@system_logger.error(log)
raise Errors::BulkQueueOverflowError
end

operation_size = bytesize(operation)
payload_size = bytesize(payload)
Expand Down
2 changes: 1 addition & 1 deletion lib/utility/es_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Utility
class EsClient < ::Elasticsearch::Client
USER_AGENT = 'elastic-web-crawler-'
MAX_RETRIES = 3
REQUEST_TIMEOUT = 30 # seconds
REQUEST_TIMEOUT = 10 # seconds
FAILED_BULKS_DIR = 'output/failed_payloads' # directory that failed bulk payloads are output to

class IndexingFailedError < StandardError
Expand Down
62 changes: 40 additions & 22 deletions spec/lib/crawler/coordinator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
)
end

def process_crawl_result # rubocop:disable Metrics/AbcSize
def process_crawl_result
allow(events).to receive(:url_output)
allow(events).to receive(:url_discover)
allow(events).to receive(:url_seed)
Expand Down Expand Up @@ -125,7 +125,6 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
it 'should capture exceptions coming from the output module and generate a failure url-extracted event' do
error = RuntimeError.new('BOOM')
expect(crawl.sink).to receive(:write).and_raise(error)
expect(crawl).to receive(:retryable_error?).with(error).and_return(false)
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
Expand All @@ -140,31 +139,50 @@ def process_crawl_result # rubocop:disable Metrics/AbcSize
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end

it 'should retry exceptions if possible' do
error = RuntimeError.new('BOOM')
expect(crawl.sink).to receive(:write).twice.and_wrap_original do |method, *args|
unless @called_before
@called_before = true
raise error
context 'when it the output sink has a lock' do
before :each do
stub_const('Crawler::Coordinator::SINK_LOCK_TIMEOUT', 3)
end

it 'should wait for the lock' do
expect(crawl.sink).to receive(:write).and_wrap_original do |method, *args|
sleep(0.5.seconds)
method.call(*args)
end
method.call(*args)

expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :success,
message: 'Successfully ingested crawl result'
)
)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end

expect(crawl).to receive(:retryable_error?).with(error).and_return(true)
expect(crawl).to receive(:interruptible_sleep)
it 'should fail if the lock acquisition times out' do
expect(crawl.sink).to receive(:write).and_wrap_original do |method, *args|
sleep(10.seconds)
method.call(*args)
end

expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :success,
message: 'Successfully ingested crawl result'
expect(events).to receive(:url_extracted).with(
hash_including(
url: crawl_result.url,
type: :allowed,
start_time: kind_of(Time),
end_time: kind_of(Time),
duration: kind_of(Benchmark::Tms),
outcome: :failure,
message: 'Executor waited 3 seconds for output sink lock but timed out.'
)
)
)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
coordinator.send(:process_crawl_result, crawl_task, crawl_result)
end
end
end

Expand Down
Loading