Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only collect messages on data that was updated #2970

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions augur/tasks/github/messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging

from datetime import timedelta, timezone

from augur.tasks.init.celery_app import celery_app as celery
from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask
Expand All @@ -10,12 +10,13 @@
from augur.tasks.github.util.util import get_owner_repo
from augur.application.db.models import PullRequest, Message, Issue, PullRequestMessageRef, IssueMessageRef, Contributor, Repo, CollectionStatus
from augur.application.db import get_engine, get_session
from augur.application.db.lib import get_core_data_last_collected
from sqlalchemy.sql import text

platform_id = 1

@celery.task(base=AugurCoreRepoCollectionTask)
def collect_github_messages(repo_git: str) -> None:
def collect_github_messages(repo_git: str, full_collection: bool) -> None:

logger = logging.getLogger(collect_github_messages.__name__)

Expand All @@ -29,9 +30,15 @@ def collect_github_messages(repo_git: str) -> None:
owner, repo = get_owner_repo(repo_git)
task_name = f"{owner}/{repo}: Message Task"

if full_collection:
core_data_last_collected = None
else:
# subtract 2 days to ensure all data is collected
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc)


if is_repo_small(repo_id):
message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name)
message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected)

if message_data:
process_messages(message_data, task_name, repo_id, logger, augur_db)
Expand All @@ -40,7 +47,7 @@ def collect_github_messages(repo_git: str) -> None:
logger.info(f"{owner}/{repo} has no messages")

else:
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db)
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db, core_data_last_collected)


def is_repo_small(repo_id):
Expand All @@ -51,13 +58,16 @@ def is_repo_small(repo_id):

return result != None

def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name) -> None:
def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name, since) -> None:

owner, repo = get_owner_repo(repo_git)

# url to get issue and pull request comments
url = f"https://api.github.com/repos/{owner}/{repo}/issues/comments"

if since:
url += f"&since={since.isoformat()}"

# define logger for task
logger.info(f"Collecting github comments for {owner}/{repo}")

Expand All @@ -70,7 +80,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas
return list(github_data_access.paginate_resource(url))


def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db) -> None:
def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db, since) -> None:

owner, repo = get_owner_repo(repo_git)

Expand All @@ -81,11 +91,20 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger

with engine.connect() as connection:

query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc);
""")
if since:
query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} AND pr_updated_at > {since} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} AND updated_at > {since} order by created_at desc);
""")
else:

query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc);
""")


result = connection.execute(query).fetchall()
comment_urls = [x[0] for x in result]
Expand Down
2 changes: 1 addition & 1 deletion augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
#Define secondary group that can't run until after primary jobs have finished.
secondary_repo_jobs = group(
collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_github_messages.si(repo_git), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_repo_clones_data.si(repo_git),
)

Expand Down Expand Up @@ -295,7 +295,7 @@
status = repo.collection_status[0]
raw_count = status.issue_pr_sum

issue_pr_task_update_weight_util([int(raw_count)],repo_git=repo_git,session=session)

Check warning on line 298 in augur/tasks/start_tasks.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 E1120: No value for argument 'issue_and_pr_nums' in function call (no-value-for-parameter) Raw Output: augur/tasks/start_tasks.py:298:12: E1120: No value for argument 'issue_and_pr_nums' in function call (no-value-for-parameter)

facade_not_pending = CollectionStatus.facade_status != CollectionState.PENDING.value
facade_not_failed = CollectionStatus.facade_status != CollectionState.FAILED_CLONE.value
Expand Down
Loading