Skip to content

Commit

Permalink
only collect messages on data that was updated
Browse files Browse the repository at this point in the history
  • Loading branch information
ABrain7710 committed Jan 15, 2025
1 parent e3197c9 commit fcf833f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
41 changes: 30 additions & 11 deletions augur/tasks/github/messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging

from datetime import timedelta, timezone

from augur.tasks.init.celery_app import celery_app as celery
from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask
Expand All @@ -10,12 +10,13 @@
from augur.tasks.github.util.util import get_owner_repo
from augur.application.db.models import PullRequest, Message, Issue, PullRequestMessageRef, IssueMessageRef, Contributor, Repo, CollectionStatus
from augur.application.db import get_engine, get_session
from augur.application.db.lib import get_core_data_last_collected
from sqlalchemy.sql import text

platform_id = 1

@celery.task(base=AugurCoreRepoCollectionTask)
def collect_github_messages(repo_git: str) -> None:
def collect_github_messages(repo_git: str, full_collection: bool) -> None:

logger = logging.getLogger(collect_github_messages.__name__)

Expand All @@ -29,9 +30,15 @@ def collect_github_messages(repo_git: str) -> None:
owner, repo = get_owner_repo(repo_git)
task_name = f"{owner}/{repo}: Message Task"

if full_collection:
core_data_last_collected = None
else:
# subtract 2 days to ensure all data is collected
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc)


if is_repo_small(repo_id):
message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name)
message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected)

if message_data:
process_messages(message_data, task_name, repo_id, logger, augur_db)
Expand All @@ -40,7 +47,7 @@ def collect_github_messages(repo_git: str) -> None:
logger.info(f"{owner}/{repo} has no messages")

else:
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db)
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db, core_data_last_collected)


def is_repo_small(repo_id):
Expand All @@ -51,13 +58,16 @@ def is_repo_small(repo_id):

return result != None

def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name) -> None:
def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name, since) -> None:

owner, repo = get_owner_repo(repo_git)

# url to get issue and pull request comments
url = f"https://api.github.com/repos/{owner}/{repo}/issues/comments"

if since:
url += f"&since={since.isoformat()}"

# define logger for task
logger.info(f"Collecting github comments for {owner}/{repo}")

Expand All @@ -70,7 +80,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas
return list(github_data_access.paginate_resource(url))


def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db) -> None:
def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db, since) -> None:

owner, repo = get_owner_repo(repo_git)

Expand All @@ -81,11 +91,20 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger

with engine.connect() as connection:

query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc);
""")
if since:
query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} AND pr_updated_at > {since} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} AND updated_at > {since} order by created_at desc);
""")
else:

query = text(f"""
(select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc)
UNION
(select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc);
""")


result = connection.execute(query).fetchall()
comment_urls = [x[0] for x in result]
Expand Down
2 changes: 1 addition & 1 deletion augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def primary_repo_collect_phase(repo_git, full_collection):
#Define secondary group that can't run until after primary jobs have finished.
secondary_repo_jobs = group(
collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_github_messages.si(repo_git), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_repo_clones_data.si(repo_git),
)

Expand Down

0 comments on commit fcf833f

Please sign in to comment.