Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better worker tracking from the db #499

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

cevian
Copy link
Collaborator

@cevian cevian commented Feb 19, 2025

Records more information about the worker status in the database. Allows more visibility from the db layer.

@cevian cevian requested a review from a team as a code owner February 19, 2025 20:36
@cevian cevian force-pushed the mat/worker-tracking branch from fea29fa to 4f1e407 Compare February 19, 2025 20:39
Records more information about the worker status in the database.
Allows more visibility from the db layer.
@cevian cevian force-pushed the mat/worker-tracking branch from 4f1e407 to 593edb1 Compare February 19, 2025 21:23
Copy link
Member

@JamesGuthrie JamesGuthrie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass.

log.error("the pgai extension is not installed")
err_msg = "the pgai extension is not installed"
log.error(err_msg)
await worker_tracking.save_vectorizer_error(None, err_msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_tracking is not yet initialized here.

db_url, poll_interval, features, __version__
)
await worker_tracking.start()
asyncio.create_task(worker_tracking.heartbeat())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like creating this task should be part of worker_tracking.start(). Is there any reason not to do that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: If you don't have any vectorizers then this doesn't actually work (the heartbeat never runs) because the current task doesn't ever yield control back to the event loop. Switching out time.sleep for asyncio.sleep below fixes it.

Comment on lines +56 to +65
num_errors = self.num_errors_since_last_heartbeat
self.num_errors_since_last_heartbeat = 0
error_message = self.error_message
self.error_message = None
num_successes = self.num_successes_since_last_heartbeat
self.num_successes_since_last_heartbeat = 0
await cur.execute(
"select ai._worker_heartbeat(%s, %s, %s, %s)",
(self.worker_id, num_successes, num_errors, error_message),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that you have a race condition here.

You spawn the async task for heartbeat(), which periodically does a heartbeat (calling _heartbeat). Simultaneously you have the "parent" async task running, which calls force_heartbeat in a number of locations.

@@ -89,18 +89,42 @@ def test_process_vectorizer(
f"items={num_items}-batch_size={batch_size}-"
f"custom_base_url={openai_proxy_url is not None}.yaml"
)
logging.getLogger("vcr").setLevel(logging.DEBUG)
# logging.getLogger("vcr").setLevel(logging.DEBUG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray comment


with vcr_.use_cassette(cassette):
result = run_vectorizer_worker(cli_db_url, vectorizer_id, concurrency)

assert not result.exception
assert result.exit_code == 0
print(f"result: {result.stdout}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional?

sys.exit(1)

if once:
await worker_tracking.force_heartbeat()
return
log.info(f"sleeping for {poll_interval_str} before polling for new work")
time.sleep(poll_interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.sleep(poll_interval)
asyncio.sleep(poll_interval)

Comment on lines +326 to +329
worker_tracking = WorkerTracking(
db_url, poll_interval, features, __version__
)
await worker_tracking.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not a big issue, but a failure to create or start the worker_tracking will result in the worker_tracking being silently broken.

INSERT INTO ai.vectorizer_worker_connection (version, expected_heartbeat_interval) VALUES (version, expected_heartbeat_interval) RETURNING id INTO worker_id;
RETURN worker_id;
END;
$$ LANGUAGE plpgsql;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a SQL function rather than plpgsql

CREATE OR REPLACE FUNCTION ai._worker_heartbeat(worker_id uuid, num_successes_since_last_heartbeat int, num_errors_since_last_heartbeat int, error_message text) RETURNS void AS $$
BEGIN
UPDATE ai.vectorizer_worker_connection SET
last_heartbeat = now()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need to be clock_timestamp() instead of now(). I think this will be called within the transaction that is doing the processing? If so, now() will return the time of the start of the transaction every you call this.

, error_count = error_count + num_errors_since_last_heartbeat
, success_count = success_count + num_successes_since_last_heartbeat
, last_error_message = CASE WHEN error_message IS NOT NULL THEN error_message ELSE last_error_message END
, last_error_at = CASE WHEN error_message IS NOT NULL THEN now() ELSE last_error_at END
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clock_timestamp() instead of now()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of the now()s in this PR should probably be clock_timestamp()

, last_error_at = CASE WHEN error_message IS NOT NULL THEN now() ELSE last_error_at END
WHERE id = worker_id;
END;
$$ LANGUAGE plpgsql;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one can be a SQL function instead of plpgsql too

BEGIN
IF NOT EXISTS (SELECT 1 FROM ai.vectorizer_worker_progress WHERE vectorizer_id = worker_vectorizer_id) THEN
--make sure a row exists for this vectorizer
INSERT INTO ai.vectorizer_worker_progress (vectorizer_id) VALUES (worker_vectorizer_id) ON CONFLICT DO NOTHING;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could do on conflict do update, right? Then, this could be 1 statement and a SQL function instead of a plpgsql function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is there something here about performance?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to set the search_path explicitly and security invoker/definer on these functions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the application_name on the database connection in a way that will correlate ai.vectorizer_worker_connection with pg_stat_activity? i.e. if we use the generate the uuid client-side and set the application_name to that uuid, then a dba can find the connection pid associated with a given worker and kill it if necessary or see exactly what it's up to right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we alter the ai.vectorizer_status view to include more information now that we have it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants