diff --git a/augur/tasks/github/messages.py b/augur/tasks/github/messages.py index 86fbe054f..c36abc24a 100644 --- a/augur/tasks/github/messages.py +++ b/augur/tasks/github/messages.py @@ -1,5 +1,5 @@ import logging - +from datetime import timedelta, timezone from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask @@ -10,12 +10,13 @@ from augur.tasks.github.util.util import get_owner_repo from augur.application.db.models import PullRequest, Message, Issue, PullRequestMessageRef, IssueMessageRef, Contributor, Repo, CollectionStatus from augur.application.db import get_engine, get_session +from augur.application.db.lib import get_core_data_last_collected from sqlalchemy.sql import text platform_id = 1 @celery.task(base=AugurCoreRepoCollectionTask) -def collect_github_messages(repo_git: str) -> None: +def collect_github_messages(repo_git: str, full_collection: bool) -> None: logger = logging.getLogger(collect_github_messages.__name__) @@ -29,9 +30,15 @@ def collect_github_messages(repo_git: str) -> None: owner, repo = get_owner_repo(repo_git) task_name = f"{owner}/{repo}: Message Task" + if full_collection: + core_data_last_collected = None + else: + # subtract 2 days to ensure all data is collected + core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc) + if is_repo_small(repo_id): - message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name) + message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected) if message_data: process_messages(message_data, task_name, repo_id, logger, augur_db) @@ -40,7 +47,7 @@ def collect_github_messages(repo_git: str) -> None: logger.info(f"{owner}/{repo} has no messages") else: - process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db) + process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db, core_data_last_collected) def is_repo_small(repo_id): @@ -51,13 +58,16 @@ def is_repo_small(repo_id): return result != None -def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name) -> None: +def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name, since) -> None: owner, repo = get_owner_repo(repo_git) # url to get issue and pull request comments url = f"https://api.github.com/repos/{owner}/{repo}/issues/comments" + if since: + url += f"&since={since.isoformat()}" + # define logger for task logger.info(f"Collecting github comments for {owner}/{repo}") @@ -70,7 +80,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas return list(github_data_access.paginate_resource(url)) -def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db) -> None: +def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db, since) -> None: owner, repo = get_owner_repo(repo_git) @@ -81,11 +91,20 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger with engine.connect() as connection: - query = text(f""" - (select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc) - UNION - (select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc); - """) + if since: + query = text(f""" + (select pr_comments_url from pull_requests WHERE repo_id={repo_id} AND pr_updated_at > {since} order by pr_created_at desc) + UNION + (select comments_url as comment_url from issues WHERE repo_id={repo_id} AND updated_at > {since} order by created_at desc); + """) + else: + + query = text(f""" + (select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc) + UNION + (select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc); + """) + result = connection.execute(query).fetchall() comment_urls = [x[0] for x in result] diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index ab4cf217c..8aa767ece 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -74,7 +74,7 @@ def primary_repo_collect_phase(repo_git, full_collection): #Define secondary group that can't run until after primary jobs have finished. secondary_repo_jobs = group( collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks, - collect_github_messages.si(repo_git), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks, + collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks, collect_github_repo_clones_data.si(repo_git), )