Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add purge crawl feature #65

Merged
merged 18 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions lib/crawler/api/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Config # rubocop:disable Metrics/ClassLength
:seed_urls, # An array or an enumerator of initial URLs to crawl
:sitemap_urls, # Array of sitemap URLs to be used for content discovery
:crawl_rules, # Array of allow/deny-listed URL patterns
:extraction_rules, # Contains domains extraction rules

:robots_txt_service, # Service to fetch robots.txt
:output_sink, # The type of output, either :console | :file | :elasticsearch
Expand All @@ -52,6 +53,7 @@ class Config # rubocop:disable Metrics/ClassLength
:results_collection, # An Enumerable collection for storing mock crawl results
:user_agent, # The User-Agent used for requests made from the crawler.
:stats_dump_interval, # How often should we output stats in the logs during a crawl
:purge_crawl_enabled, # Whether or not to purge ES docs after a crawl

# Elasticsearch settings
:elasticsearch, # Elasticsearch connection settings
Expand Down Expand Up @@ -108,16 +110,15 @@ class Config # rubocop:disable Metrics/ClassLength
:max_headings_count, # HTML heading tags count limit

# Content extraction (from files)
:content_extraction_enabled, # Enable content extraction of non-HTML files found during a crawl
:content_extraction_enabled, # Enable content extraction of non-HTML files found during a crawl
:content_extraction_mime_types, # Extract files with the following MIME types

# Other crawler tuning settings
:default_encoding, # Default encoding used for responses that do not specify a charset
:compression_enabled, # Enable/disable HTTP content compression
:sitemap_discovery_disabled, # Enable/disable crawling of sitemaps defined in robots.txt
:head_requests_enabled, # Fetching HEAD requests before GET requests enabled
:default_encoding, # Default encoding used for responses that do not specify a charset
:compression_enabled, # Enable/disable HTTP content compression
:sitemap_discovery_disabled, # Enable/disable crawling of sitemaps defined in robots.txt
:head_requests_enabled # Fetching HEAD requests before GET requests enabled

:extraction_rules # Contains domains extraction rules
].freeze

EXTRACTION_RULES_FIELDS = %i[url_filters rules].freeze
Expand Down Expand Up @@ -177,7 +178,8 @@ class Config # rubocop:disable Metrics/ClassLength
head_requests_enabled: false,

extraction_rules: {},
crawl_rules: {}
crawl_rules: {},
purge_crawl_enabled: true
}.freeze

# Settings we are not allowed to log due to their sensitive nature
Expand Down
21 changes: 17 additions & 4 deletions lib/crawler/api/crawl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
)
coordinator.run_crawl!

record_outcome(
outcome: coordinator.crawl_outcome,
message: coordinator.outcome_message
)
record_overall_outcome(coordinator.crawl_results)
rescue StandardError => e
log_exception(e, 'Unexpected error while running the crawl')
record_outcome(
Expand Down Expand Up @@ -136,6 +133,22 @@ def status

private

def record_overall_outcome(results)
if config.output_sink != 'elasticsearch' || !config.purge_crawl_enabled
# only need primary crawl results in this situation
record_outcome(outcome: results[:primary][:outcome], message: results[:primary][:message])
return
end

outcome = combined_outcome(results)
message = "#{results[:primary][:message]} | #{results[:purge][:outcome]}"
record_outcome(outcome:, message:)
end

def combined_outcome(results)
results[:primary][:outcome] == :success && results[:purge][:outcome] == :success ? :success : :failure
end

def record_outcome(outcome:, message:)
@outcome = outcome
@outcome_message = message
Expand Down
132 changes: 101 additions & 31 deletions lib/crawler/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ module Crawler
class Coordinator # rubocop:disable Metrics/ClassLength
SEED_LIST = 'seed-list'

CRAWL_STAGE_PRIMARY = :primary
CRAWL_STAGE_PURGE = :purge

SINK_LOCK_RETRY_INTERVAL = 1.second
SINK_LOCK_MAX_RETRIES = 120

attr_reader :crawl, :seen_urls, :crawl_outcome, :outcome_message, :started_at, :task_executors
attr_reader :crawl, :crawl_results, :crawl_stage, :seen_urls, :started_at, :task_executors

delegate :events, :system_logger, :config, :executor, :sink, :rule_engine,
:interruptible_sleep, :shutdown_started?, :allow_resume?,
Expand All @@ -42,8 +45,12 @@ def initialize(crawl)
)

# Setup crawl internal state
@crawl_outcome = nil
@outcome_message = nil
@crawl_stage = CRAWL_STAGE_PRIMARY
@crawl_results = {
CRAWL_STAGE_PRIMARY => { outcome: nil, message: nil },
CRAWL_STAGE_PURGE => { outcome: nil, message: nil }
}
@purge_backlog = {}
@started_at = Time.now
end

Expand All @@ -60,23 +67,8 @@ def active_threads

#-----------------------------------------------------------------------------------------------
def run_crawl!
load_robots_txts
enqueue_seed_urls
enqueue_sitemaps

system_logger.info("Starting the crawl with up to #{task_executors.max_length} parallel thread(s)...")

# Run the crawl until it is time to stop
until crawl_finished?
if executors_available?
run_crawl_loop
else
# Sleep for a bit if there are no available executors to avoid creating a hot loop
system_logger.debug('No executors available, sleeping for a second...')
sleep(1)
end
events.log_crawl_status(crawl)
end
run_primary_crawl!
run_purge_crawl! if purge_crawls_allowed?

# Close the sink to make sure all the in-flight content has been safely stored/indexed/etc
system_logger.info('Closing the output sink before finishing the crawl...')
Expand All @@ -87,8 +79,54 @@ def run_crawl!
system_logger.info('Crawl shutdown complete')
end

def run_primary_crawl!
@crawl_stage = CRAWL_STAGE_PRIMARY
system_logger.info("Starting the primary crawl with up to #{task_executors.max_length} parallel thread(s)...")

load_robots_txts
enqueue_seed_urls
enqueue_sitemaps

run_crawl_loop
end

# Fetches the URLs from docs that were indexed in a previous crawl, but were not seen in the current crawl.
# Those URLs are crawled to see if they still exist. Any URLs that can't be found will be deleted from the index.
def run_purge_crawl!
@crawl_stage = CRAWL_STAGE_PURGE
@purge_backlog = sink.fetch_purge_docs(started_at)

if @purge_backlog.empty?
system_logger.info('No documents were found for the purge crawl. Skipping purge crawl.')
return
end

system_logger.info("Starting the purge crawl with up to #{task_executors.max_length} parallel thread(s)...")
enqueue_purge_urls(@purge_backlog.keys)

run_crawl_loop

# Any URLs in the purge backlog that are still accessible have been removed during the purge crawl loop.
# We can safely send all of the remaining IDs to be deleted.
sink.purge(@purge_backlog.values)
end

private

def purge_crawls_allowed?
unless config.output_sink == 'elasticsearch'
system_logger.info("Purge crawls are not supported for sink type #{config.output_sink}. Skipping purge crawl.")
return false
end

unless config.purge_crawl_enabled
system_logger.info('Purge crawls are disabled in the config file. Skipping purge crawl.')
return false
end

true
end

#-----------------------------------------------------------------------------------------------
# Communicates the progress on a given crawl task via the system log and Java thread names
def crawl_task_progress(crawl_task, message)
Expand Down Expand Up @@ -181,6 +219,12 @@ def enqueue_sitemaps
)
end

def enqueue_purge_urls(urls)
system_logger.debug("Seeding the crawl with #{urls.size} URLs from the ES index...")
parsed_urls = urls.map { |url| Crawler::Data::URL.parse(url) }
add_urls_to_backlog(urls: parsed_urls, type: :content, source_type: :purge, crawl_depth: 1)
end

def fetch_valid_auto_discovered_sitemap_urls!
config.robots_txt_service.sitemaps.each_with_object([]) do |sitemap, out|
sitemap_url = Crawler::Data::URL.parse(sitemap)
Expand All @@ -195,8 +239,8 @@ def fetch_valid_auto_discovered_sitemap_urls!

#-----------------------------------------------------------------------------------------------
def set_outcome(outcome, message)
@crawl_outcome = outcome
@outcome_message = message
@crawl_results[@crawl_stage][:outcome] = outcome
@crawl_results[@crawl_stage][:message] = message
end

#-----------------------------------------------------------------------------------------------
Expand All @@ -208,31 +252,33 @@ def executors_available?
#-----------------------------------------------------------------------------------------------
# Checks if we should terminate the crawl loop and sets the outcome value accordingly
def crawl_finished?
return true if crawl_outcome
return true if @crawl_results[@crawl_stage][:outcome]

# Check if there are any active tasks still being processed
return false if task_executors.length.positive?

if crawl_queue.empty? && !shutdown_started?
system_logger.info('Crawl queue is empty, finishing the crawl')
set_outcome(:success, 'Successfully finished the crawl with an empty crawl queue')
system_logger.info("Crawl queue is empty, finishing the #{@crawl_stage} crawl")
set_outcome(:success, "Successfully finished the #{@crawl_stage} crawl with an empty crawl queue")
return true
end

if shutdown_started?
set_outcome(
:shutdown,
"Terminated the crawl with #{crawl_queue.length} unprocessed URLs " \
"Terminated the #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs " \
"due to a crawler shutdown (allow_resume=#{allow_resume?})"
)
system_logger.warn("Shutting down the crawl with #{crawl_queue.length} unprocessed URLs...")
system_logger.warn("Shutting down the #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs...")
return true
end

# This duration is the total of both the primary and purge crawl stages,
# so no need to differentiate between them here.
if crawl_duration > config.max_duration
outcome_message = <<~OUTCOME.squish
The crawl is taking too long (elapsed: #{crawl_duration.to_i} sec, limit: #{config.max_duration} sec).
Shutting down with #{crawl_queue.length} unprocessed URLs.
Shutting down #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs.
If you would like to increase the limit, change the max_duration setting.
OUTCOME
set_outcome(:warning, outcome_message)
Expand All @@ -243,9 +289,25 @@ def crawl_finished?
false
end

def run_crawl_loop
until crawl_finished?
if executors_available?
prepare_crawl_task
else
# Sleep for a bit if there are no available executors to avoid creating a hot loop
system_logger.debug('No executors available, sleeping for a second...')
sleep(1)
end
events.log_crawl_status(crawl)
end

sink.flush
log_crawl_end_event
end

#-----------------------------------------------------------------------------------------------
# Performs a single iteration of the crawl loop
def run_crawl_loop
def prepare_crawl_task
return if shutdown_started?

# Get a task to execute
Expand Down Expand Up @@ -321,7 +383,6 @@ def process_crawl_result(crawl_task, crawl_result)
)
elsif crawl_result.redirect?
crawl_task_progress(crawl_task, 'skipping ingestion of redirect')
extracted_event[:redirect_location] = crawl_result.location
extracted_event[:message] = "Crawler was redirected to #{crawl_result.location}"
elsif crawl_task.content?
crawl_task_progress(crawl_task, 'ingesting the result')
Expand All @@ -336,7 +397,7 @@ def process_crawl_result(crawl_task, crawl_result)
#-----------------------------------------------------------------------------------------------
# Extracts links from a given crawl result and pushes them into the crawl queue for processing
def extract_and_enqueue_links(crawl_task, crawl_result)
return if crawl_result.error?
return if crawl_result.error? || @crawl_stage == CRAWL_STAGE_PURGE

crawl_task_progress(crawl_task, 'extracting links')
return enqueue_redirect_link(crawl_task, crawl_result) if crawl_result.redirect?
Expand Down Expand Up @@ -461,6 +522,8 @@ def output_crawl_result(crawl_result)
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead"
raise ArgumentError, error
end

@purge_backlog.delete(crawl_result.url) if @crawl_stage == CRAWL_STAGE_PURGE
end
rescue Errors::SinkLockedError
# Adding a debug log here is incredibly noisy, so instead we should rely on logging from the sink
Expand Down Expand Up @@ -650,6 +713,13 @@ def check_discovered_url(url:, type:, source_url:, crawl_depth:) # rubocop:disab

:allow
end

def log_crawl_end_event
events.crawl_stage_end(
outcome: @crawl_results[@crawl_stage][:outcome],
message: @crawl_results[@crawl_stage][:message]
)
end
end
end
# rubocop:enable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity
15 changes: 13 additions & 2 deletions lib/crawler/event_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ def log_crawl_status(crawl, force: false)
#-----------------------------------------------------------------------------------------------
def log_error(error, message)
full_message = "#{message}: #{error.class}: #{error.message}"
system_logger.error("Crawl Error: #{full_message}")
backtrace = error.backtrace&.join("\n")
system_logger.error("Crawl Error: #{full_message} #{backtrace}")
log_event(
'event.type' => 'error',
'error.message' => full_message,
'error.stack_trace' => error.backtrace&.join("\n")
'error.stack_trace' => backtrace
)
end

Expand All @@ -71,6 +72,16 @@ def crawl_start(url_queue_items:, seen_urls:)
)
end

def crawl_stage_end(outcome:, message:)
system_logger.info("Finished a crawl stage. Result: #{outcome}; #{message}")
log_crawl_event(
'event.type' => 'change',
'event.action' => 'crawl-stage-end',
'event.outcome' => outcome,
'message' => message
)
end

def crawl_end(outcome:, message:, resume_possible:)
system_logger.info("Finished a crawl. Result: #{outcome}; #{message}")
log_crawl_event(
Expand Down
2 changes: 1 addition & 1 deletion lib/crawler/http_utils/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def initialize(config)
end

validate_required_options(config, REQUIRED_OPTIONS)
super(config)
super
end

def http_proxy_host
Expand Down
13 changes: 13 additions & 0 deletions lib/crawler/output_sink/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def write(_crawl_result)
raise NotImplementedError
end

def fetch_purge_docs(_crawl_start_time)
raise NotImplementedError
end

def purge(_crawl_start_time)
raise NotImplementedError
end

def to_doc(crawl_result)
doc = { id: crawl_result.url_hash }
doc.merge!(document_mapper.document_fields(crawl_result))
Expand All @@ -41,6 +49,11 @@ def close
# Does nothing by default.
end

def flush
# To be implemented by the sink if needed.
# Does nothing by default.
end

#-----------------------------------------------------------------------------------------------
# Returns a hash with the outcome of crawl result ingestion (to be used for logging above)
def outcome(outcome, message)
Expand Down
Loading