-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock bulk queue while processing indexing request #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a ruby expert but left some comments :)
@jedrazb @artem-shelkovnikov thanks for the review, I switched to using |
lib/crawler/coordinator.rb
Outdated
unless outcome.is_a?(Hash) | ||
error = "Expected to return an outcome object from the sink, returned #{outcome.inspect} instead" | ||
raise ArgumentError, error | ||
Timeout.timeout(SINK_LOCK_TIMEOUT) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a library function, or built-in ruby one?
Is this one thread-safe?
Asking cause of these scary articles:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - how does SINK_LOCK_TIMEOUT interact with retries for sink? Can it timeout before sink finishes retrying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout
is built-in ruby and it appears it wouldn't be thread-safe there 🤦🏻 sorry I never thought something built-in could be so bad, it seems everywhere recommends not to use it.
Also - how does SINK_LOCK_TIMEOUT interact with retries for sink? Can it timeout before sink finishes retrying?
There's no interaction, it's just a flat timeout.
There are two retries that would happen in sink.
- First is acquiring the lock, which isn't really a timeout,
mutex.synchronize
just waits for the lock to be lifted. - Second is the flush (ES request) retries. In a worst-case scenario of 4 attempts each timing out at 10 seconds, it would take around 54 seconds (10 + 12 + 14 + 18) to release the lock. If the flush fails the entire payload will be dropped so the next executor that acquires the lock won't reattempt this request, it'll add its crawl result to the now-empty queue. This means the executors waiting for the lock for these 54ish seconds should clear up quickly afterwards.
If we remove Timeout
I think this could be reworked to use a mutex.try_lock
> mutex.unlock
block instead of mutex.synchronize
. If the lock can't be acquired it sleeps for 1 second, up to a max of maybe 120 retries (so approx. two minutes), before throwing an error. How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh it happens :)
I think indeed using mutex is best. A question raised in my head while reading through it - what happens if Elasticsearch is overloaded and start throttling?
So you have 10 threads that are getting page content and throwing it into single sink. Sink starts to slow down, but do threads keep extracting content? What happens if your timeout it 120 seconds, but sink took 118 seconds to unlock, will all threads waiting for it be able to write into it anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you have 10 threads that are getting page content and throwing it into single sink. Sink starts to slow down, but do threads keep extracting content?
No, content extraction should pause during this for all threads. The executors are idling waiting for the lock.
What happens if your timeout it 120 seconds, but sink took 118 seconds to unlock, will all threads waiting for it be able to write into it anyway?
If the remaining executors can do write everything in 2 seconds, yes. The write is usually very fast if there's no flush required, so I think most would go through in this case, but there's always a possibility of some being dropped.
I think that's unavoidable, and as long as it's logged clearly users can do something about it (e.g. look at why Elasticsearch throttles the Crawler).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artem-shelkovnikov I've changed things a bit so Timeout
isn't used, I thought it made more sense to go by lock acquisition attempts instead (this also means if a thread acquires a lock late and takes a long time itself, it won't be unnecessarily terminated early).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a look and haven't found anything broken with this approach. I'm also not super good with concurrent operations, so some stuff might go south regardless, this is something that's just worth testing at some point (Like ingest a large site into a super small Elasticsearch instance and observe), that can of course be done later when you feel it's the right moment!
### Closes #42 Add a mutex lock to the `write` method in the `Elasticsearch` sink. If `write` is called, the lock is enabled, and nothing can be added to the pool until the lock is lifted.
Closes #42
The coordinator currently doesn't care about the state of the bulk queue, and will add a crawl result to the queue pool whenever it is finished crawling a page.
This is not thread-safe as multiple threads attempting to add to the queue causes overwrites and potentially lost data.
This is more noticeable since implementing retries with exponential backoff for ES indexing.
This change adds a mutex lock to the
write
method in theElasticsearch
sink. Ifwrite
is called, the lock is enabled, and nothing can be added to the pool until the lock is lifted.This effectively pauses crawl requests when ES indexing is overloaded. This should only impact performance if an ES instance is unhealthy or there are network issues. I think this is acceptable as the user should investigate these things anyway.
Pre-Review Checklist
crawler.yml.example
andelasticsearch.yml.example
)v0.1.0
)