-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: better worker tracking from the db #499
base: main
Are you sure you want to change the base?
Conversation
fea29fa
to
4f1e407
Compare
Records more information about the worker status in the database. Allows more visibility from the db layer.
4f1e407
to
593edb1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial pass.
log.error("the pgai extension is not installed") | ||
err_msg = "the pgai extension is not installed" | ||
log.error(err_msg) | ||
await worker_tracking.save_vectorizer_error(None, err_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker_tracking is not yet initialized here.
db_url, poll_interval, features, __version__ | ||
) | ||
await worker_tracking.start() | ||
asyncio.create_task(worker_tracking.heartbeat()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like creating this task should be part of worker_tracking.start()
. Is there any reason not to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: If you don't have any vectorizers then this doesn't actually work (the heartbeat never runs) because the current task doesn't ever yield control back to the event loop. Switching out time.sleep
for asyncio.sleep
below fixes it.
num_errors = self.num_errors_since_last_heartbeat | ||
self.num_errors_since_last_heartbeat = 0 | ||
error_message = self.error_message | ||
self.error_message = None | ||
num_successes = self.num_successes_since_last_heartbeat | ||
self.num_successes_since_last_heartbeat = 0 | ||
await cur.execute( | ||
"select ai._worker_heartbeat(%s, %s, %s, %s)", | ||
(self.worker_id, num_successes, num_errors, error_message), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that you have a race condition here.
You spawn the async task for heartbeat()
, which periodically does a heartbeat (calling _heartbeat
). Simultaneously you have the "parent" async task running, which calls force_heartbeat
in a number of locations.
@@ -89,18 +89,42 @@ def test_process_vectorizer( | |||
f"items={num_items}-batch_size={batch_size}-" | |||
f"custom_base_url={openai_proxy_url is not None}.yaml" | |||
) | |||
logging.getLogger("vcr").setLevel(logging.DEBUG) | |||
# logging.getLogger("vcr").setLevel(logging.DEBUG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stray comment
|
||
with vcr_.use_cassette(cassette): | ||
result = run_vectorizer_worker(cli_db_url, vectorizer_id, concurrency) | ||
|
||
assert not result.exception | ||
assert result.exit_code == 0 | ||
print(f"result: {result.stdout}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intentional?
sys.exit(1) | ||
|
||
if once: | ||
await worker_tracking.force_heartbeat() | ||
return | ||
log.info(f"sleeping for {poll_interval_str} before polling for new work") | ||
time.sleep(poll_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.sleep(poll_interval) | |
asyncio.sleep(poll_interval) |
worker_tracking = WorkerTracking( | ||
db_url, poll_interval, features, __version__ | ||
) | ||
await worker_tracking.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not a big issue, but a failure to create or start the worker_tracking will result in the worker_tracking being silently broken.
INSERT INTO ai.vectorizer_worker_connection (version, expected_heartbeat_interval) VALUES (version, expected_heartbeat_interval) RETURNING id INTO worker_id; | ||
RETURN worker_id; | ||
END; | ||
$$ LANGUAGE plpgsql; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a SQL function rather than plpgsql
CREATE OR REPLACE FUNCTION ai._worker_heartbeat(worker_id uuid, num_successes_since_last_heartbeat int, num_errors_since_last_heartbeat int, error_message text) RETURNS void AS $$ | ||
BEGIN | ||
UPDATE ai.vectorizer_worker_connection SET | ||
last_heartbeat = now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may need to be clock_timestamp()
instead of now()
. I think this will be called within the transaction that is doing the processing? If so, now()
will return the time of the start of the transaction every you call this.
, error_count = error_count + num_errors_since_last_heartbeat | ||
, success_count = success_count + num_successes_since_last_heartbeat | ||
, last_error_message = CASE WHEN error_message IS NOT NULL THEN error_message ELSE last_error_message END | ||
, last_error_at = CASE WHEN error_message IS NOT NULL THEN now() ELSE last_error_at END |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clock_timestamp()
instead of now()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of the now()
s in this PR should probably be clock_timestamp()
, last_error_at = CASE WHEN error_message IS NOT NULL THEN now() ELSE last_error_at END | ||
WHERE id = worker_id; | ||
END; | ||
$$ LANGUAGE plpgsql; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one can be a SQL function instead of plpgsql too
BEGIN | ||
IF NOT EXISTS (SELECT 1 FROM ai.vectorizer_worker_progress WHERE vectorizer_id = worker_vectorizer_id) THEN | ||
--make sure a row exists for this vectorizer | ||
INSERT INTO ai.vectorizer_worker_progress (vectorizer_id) VALUES (worker_vectorizer_id) ON CONFLICT DO NOTHING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could do on conflict do update
, right? Then, this could be 1 statement and a SQL function instead of a plpgsql function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is there something here about performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to set the search_path explicitly and security invoker/definer on these functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set the application_name
on the database connection in a way that will correlate ai.vectorizer_worker_connection
with pg_stat_activity
? i.e. if we use the generate the uuid client-side and set the application_name to that uuid, then a dba can find the connection pid associated with a given worker and kill it if necessary or see exactly what it's up to right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we alter the ai.vectorizer_status
view to include more information now that we have it?
Records more information about the worker status in the database. Allows more visibility from the db layer.