Skip to content

Commit

Permalink
Add purge crawl feature (#65)
Browse files Browse the repository at this point in the history
Add the Purge Crawl feature, which allows Crawler to delete outdated
docs from the index.
  • Loading branch information
navarone-feekery authored Aug 22, 2024
1 parent 5a04a00 commit 28ed2ec
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 59 deletions.
16 changes: 9 additions & 7 deletions lib/crawler/api/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Config # rubocop:disable Metrics/ClassLength
:seed_urls, # An array or an enumerator of initial URLs to crawl
:sitemap_urls, # Array of sitemap URLs to be used for content discovery
:crawl_rules, # Array of allow/deny-listed URL patterns
:extraction_rules, # Contains domains extraction rules

:robots_txt_service, # Service to fetch robots.txt
:output_sink, # The type of output, either :console | :file | :elasticsearch
Expand All @@ -52,6 +53,7 @@ class Config # rubocop:disable Metrics/ClassLength
:results_collection, # An Enumerable collection for storing mock crawl results
:user_agent, # The User-Agent used for requests made from the crawler.
:stats_dump_interval, # How often should we output stats in the logs during a crawl
:purge_crawl_enabled, # Whether or not to purge ES docs after a crawl, only possible for elasticsearch sinks

# Elasticsearch settings
:elasticsearch, # Elasticsearch connection settings
Expand Down Expand Up @@ -108,16 +110,15 @@ class Config # rubocop:disable Metrics/ClassLength
:max_headings_count, # HTML heading tags count limit

# Content extraction (from files)
:content_extraction_enabled, # Enable content extraction of non-HTML files found during a crawl
:content_extraction_enabled, # Enable content extraction of non-HTML files found during a crawl
:content_extraction_mime_types, # Extract files with the following MIME types

# Other crawler tuning settings
:default_encoding, # Default encoding used for responses that do not specify a charset
:compression_enabled, # Enable/disable HTTP content compression
:sitemap_discovery_disabled, # Enable/disable crawling of sitemaps defined in robots.txt
:head_requests_enabled, # Fetching HEAD requests before GET requests enabled
:default_encoding, # Default encoding used for responses that do not specify a charset
:compression_enabled, # Enable/disable HTTP content compression
:sitemap_discovery_disabled, # Enable/disable crawling of sitemaps defined in robots.txt
:head_requests_enabled # Fetching HEAD requests before GET requests enabled

:extraction_rules # Contains domains extraction rules
].freeze

EXTRACTION_RULES_FIELDS = %i[url_filters rules].freeze
Expand Down Expand Up @@ -177,7 +178,8 @@ class Config # rubocop:disable Metrics/ClassLength
head_requests_enabled: false,

extraction_rules: {},
crawl_rules: {}
crawl_rules: {},
purge_crawl_enabled: true
}.freeze

# Settings we are not allowed to log due to their sensitive nature
Expand Down
21 changes: 17 additions & 4 deletions lib/crawler/api/crawl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ def start! # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
)
coordinator.run_crawl!

record_outcome(
outcome: coordinator.crawl_outcome,
message: coordinator.outcome_message
)
record_overall_outcome(coordinator.crawl_results)
rescue StandardError => e
log_exception(e, 'Unexpected error while running the crawl')
record_outcome(
Expand Down Expand Up @@ -136,6 +133,22 @@ def status

private

def record_overall_outcome(results)
if config.output_sink != 'elasticsearch' || !config.purge_crawl_enabled
# only need primary crawl results in this situation
record_outcome(outcome: results[:primary][:outcome], message: results[:primary][:message])
return
end

outcome = combined_outcome(results)
message = "#{results[:primary][:message]} | #{results[:purge][:outcome]}"
record_outcome(outcome:, message:)
end

def combined_outcome(results)
results[:primary][:outcome] == :success && results[:purge][:outcome] == :success ? :success : :failure
end

def record_outcome(outcome:, message:)
@outcome = outcome
@outcome_message = message
Expand Down
131 changes: 100 additions & 31 deletions lib/crawler/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ module Crawler
class Coordinator # rubocop:disable Metrics/ClassLength
SEED_LIST = 'seed-list'

CRAWL_STAGE_PRIMARY = :primary
CRAWL_STAGE_PURGE = :purge

SINK_LOCK_RETRY_INTERVAL = 1.second
SINK_LOCK_MAX_RETRIES = 120

attr_reader :crawl, :seen_urls, :crawl_outcome, :outcome_message, :started_at, :task_executors
ELASTICSEARCH_OUTPUT_SINK = 'elasticsearch'

attr_reader :crawl, :crawl_results, :crawl_stage, :seen_urls, :started_at, :task_executors

delegate :events, :system_logger, :config, :executor, :sink, :rule_engine,
:interruptible_sleep, :shutdown_started?, :allow_resume?,
Expand All @@ -42,8 +47,11 @@ def initialize(crawl)
)

# Setup crawl internal state
@crawl_outcome = nil
@outcome_message = nil
@crawl_stage = CRAWL_STAGE_PRIMARY
@crawl_results = {
CRAWL_STAGE_PRIMARY => { outcome: nil, message: nil },
CRAWL_STAGE_PURGE => { outcome: nil, message: nil }
}
@started_at = Time.now
end

Expand All @@ -60,23 +68,8 @@ def active_threads

#-----------------------------------------------------------------------------------------------
def run_crawl!
load_robots_txts
enqueue_seed_urls
enqueue_sitemaps

system_logger.info("Starting the crawl with up to #{task_executors.max_length} parallel thread(s)...")

# Run the crawl until it is time to stop
until crawl_finished?
if executors_available?
run_crawl_loop
else
# Sleep for a bit if there are no available executors to avoid creating a hot loop
system_logger.debug('No executors available, sleeping for a second...')
sleep(1)
end
events.log_crawl_status(crawl)
end
run_primary_crawl!
run_purge_crawl! if purge_crawls_allowed?

# Close the sink to make sure all the in-flight content has been safely stored/indexed/etc
system_logger.info('Closing the output sink before finishing the crawl...')
Expand All @@ -87,8 +80,54 @@ def run_crawl!
system_logger.info('Crawl shutdown complete')
end

def run_primary_crawl!
@crawl_stage = CRAWL_STAGE_PRIMARY
system_logger.info("Starting the primary crawl with up to #{task_executors.max_length} parallel thread(s)...")

load_robots_txts
enqueue_seed_urls
enqueue_sitemaps

run_crawl_loop
end

# Fetches the URLs from docs that were indexed in a previous crawl, but were not seen in the current crawl.
# Those URLs are crawled to see if they still exist. Any URLs that can't be found will be deleted from the index.
def run_purge_crawl!
@crawl_stage = CRAWL_STAGE_PURGE
purge_urls = sink.fetch_purge_docs(started_at)

if purge_urls.empty?
system_logger.info('No documents were found for the purge crawl. Skipping purge crawl.')
return
end

system_logger.info("Starting the purge crawl with up to #{task_executors.max_length} parallel thread(s)...")
enqueue_purge_urls(purge_urls)

run_crawl_loop

# Any docs in the index that have a `last_crawled_at` value
# earlier than the primary crawl's start time are now safe to delete
sink.purge(started_at)
end

private

def purge_crawls_allowed?
unless config.output_sink == ELASTICSEARCH_OUTPUT_SINK
system_logger.warn("Purge crawls are not supported for sink type #{config.output_sink}. Skipping purge crawl.")
return false
end

unless config.purge_crawl_enabled
system_logger.warn('Purge crawls are disabled in the config file. Skipping purge crawl.')
return false
end

true
end

#-----------------------------------------------------------------------------------------------
# Communicates the progress on a given crawl task via the system log and Java thread names
def crawl_task_progress(crawl_task, message)
Expand Down Expand Up @@ -181,6 +220,12 @@ def enqueue_sitemaps
)
end

def enqueue_purge_urls(urls)
system_logger.debug("Seeding the crawl with #{urls.size} URLs from the ES index...")
parsed_urls = urls.map { |url| Crawler::Data::URL.parse(url) }
add_urls_to_backlog(urls: parsed_urls, type: :content, source_type: :purge, crawl_depth: 1)
end

def fetch_valid_auto_discovered_sitemap_urls!
config.robots_txt_service.sitemaps.each_with_object([]) do |sitemap, out|
sitemap_url = Crawler::Data::URL.parse(sitemap)
Expand All @@ -195,8 +240,8 @@ def fetch_valid_auto_discovered_sitemap_urls!

#-----------------------------------------------------------------------------------------------
def set_outcome(outcome, message)
@crawl_outcome = outcome
@outcome_message = message
@crawl_results[@crawl_stage][:outcome] = outcome
@crawl_results[@crawl_stage][:message] = message
end

#-----------------------------------------------------------------------------------------------
Expand All @@ -208,31 +253,33 @@ def executors_available?
#-----------------------------------------------------------------------------------------------
# Checks if we should terminate the crawl loop and sets the outcome value accordingly
def crawl_finished?
return true if crawl_outcome
return true if @crawl_results[@crawl_stage][:outcome]

# Check if there are any active tasks still being processed
return false if task_executors.length.positive?

if crawl_queue.empty? && !shutdown_started?
system_logger.info('Crawl queue is empty, finishing the crawl')
set_outcome(:success, 'Successfully finished the crawl with an empty crawl queue')
system_logger.info("Crawl queue is empty, finishing the #{@crawl_stage} crawl")
set_outcome(:success, "Successfully finished the #{@crawl_stage} crawl with an empty crawl queue")
return true
end

if shutdown_started?
set_outcome(
:shutdown,
"Terminated the crawl with #{crawl_queue.length} unprocessed URLs " \
"Terminated the #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs " \
"due to a crawler shutdown (allow_resume=#{allow_resume?})"
)
system_logger.warn("Shutting down the crawl with #{crawl_queue.length} unprocessed URLs...")
system_logger.warn("Shutting down the #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs...")
return true
end

# This duration is the total of both the primary and purge crawl stages,
# so no need to differentiate between them here.
if crawl_duration > config.max_duration
outcome_message = <<~OUTCOME.squish
The crawl is taking too long (elapsed: #{crawl_duration.to_i} sec, limit: #{config.max_duration} sec).
Shutting down with #{crawl_queue.length} unprocessed URLs.
Shutting down #{@crawl_stage} crawl with #{crawl_queue.length} unprocessed URLs.
If you would like to increase the limit, change the max_duration setting.
OUTCOME
set_outcome(:warning, outcome_message)
Expand All @@ -243,9 +290,25 @@ def crawl_finished?
false
end

def run_crawl_loop
until crawl_finished?
if executors_available?
prepare_crawl_task
else
# Sleep for a bit if there are no available executors to avoid creating a hot loop
system_logger.debug('No executors available, sleeping for a second...')
sleep(1)
end
events.log_crawl_status(crawl)
end

sink.flush
log_crawl_end_event
end

#-----------------------------------------------------------------------------------------------
# Performs a single iteration of the crawl loop
def run_crawl_loop
def prepare_crawl_task
return if shutdown_started?

# Get a task to execute
Expand Down Expand Up @@ -321,7 +384,6 @@ def process_crawl_result(crawl_task, crawl_result)
)
elsif crawl_result.redirect?
crawl_task_progress(crawl_task, 'skipping ingestion of redirect')
extracted_event[:redirect_location] = crawl_result.location
extracted_event[:message] = "Crawler was redirected to #{crawl_result.location}"
elsif crawl_task.content?
crawl_task_progress(crawl_task, 'ingesting the result')
Expand All @@ -336,7 +398,7 @@ def process_crawl_result(crawl_task, crawl_result)
#-----------------------------------------------------------------------------------------------
# Extracts links from a given crawl result and pushes them into the crawl queue for processing
def extract_and_enqueue_links(crawl_task, crawl_result)
return if crawl_result.error?
return if crawl_result.error? || @crawl_stage == CRAWL_STAGE_PURGE

crawl_task_progress(crawl_task, 'extracting links')
return enqueue_redirect_link(crawl_task, crawl_result) if crawl_result.redirect?
Expand Down Expand Up @@ -650,6 +712,13 @@ def check_discovered_url(url:, type:, source_url:, crawl_depth:) # rubocop:disab

:allow
end

def log_crawl_end_event
events.crawl_stage_end(
outcome: @crawl_results[@crawl_stage][:outcome],
message: @crawl_results[@crawl_stage][:message]
)
end
end
end
# rubocop:enable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity
15 changes: 13 additions & 2 deletions lib/crawler/event_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ def log_crawl_status(crawl, force: false)
#-----------------------------------------------------------------------------------------------
def log_error(error, message)
full_message = "#{message}: #{error.class}: #{error.message}"
system_logger.error("Crawl Error: #{full_message}")
backtrace = error.backtrace&.join("\n")
system_logger.error("Crawl Error: #{full_message} #{backtrace}")
log_event(
'event.type' => 'error',
'error.message' => full_message,
'error.stack_trace' => error.backtrace&.join("\n")
'error.stack_trace' => backtrace
)
end

Expand All @@ -71,6 +72,16 @@ def crawl_start(url_queue_items:, seen_urls:)
)
end

def crawl_stage_end(outcome:, message:)
system_logger.info("Finished a crawl stage. Result: #{outcome}; #{message}")
log_crawl_event(
'event.type' => 'change',
'event.action' => 'crawl-stage-end',
'event.outcome' => outcome,
'message' => message
)
end

def crawl_end(outcome:, message:, resume_possible:)
system_logger.info("Finished a crawl. Result: #{outcome}; #{message}")
log_crawl_event(
Expand Down
2 changes: 1 addition & 1 deletion lib/crawler/http_utils/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def initialize(config)
end

validate_required_options(config, REQUIRED_OPTIONS)
super(config)
super
end

def http_proxy_host
Expand Down
13 changes: 13 additions & 0 deletions lib/crawler/output_sink/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def write(_crawl_result)
raise NotImplementedError
end

def fetch_purge_docs(_crawl_start_time)
raise NotImplementedError
end

def purge(_crawl_start_time)
raise NotImplementedError
end

def to_doc(crawl_result)
document_mapper.create_doc(crawl_result)
end
Expand All @@ -37,6 +45,11 @@ def close
# Does nothing by default.
end

def flush
# To be implemented by the sink if needed.
# Does nothing by default.
end

#-----------------------------------------------------------------------------------------------
# Returns a hash with the outcome of crawl result ingestion (to be used for logging above)
def outcome(outcome, message)
Expand Down
Loading

0 comments on commit 28ed2ec

Please sign in to comment.