diff --git a/augur/application/db/data_parse.py b/augur/application/db/data_parse.py index 1291276f5..219c09c0e 100644 --- a/augur/application/db/data_parse.py +++ b/augur/application/db/data_parse.py @@ -283,7 +283,7 @@ def extract_pr_review_message_ref_data(comment: dict, augur_pr_review_id, github return pr_review_comment_message_ref -def extract_pr_event_data(event: dict, pr_id: int, platform_id: int, repo_id: int, tool_source: str, tool_version: str, data_source: str) -> dict: +def extract_pr_event_data(event: dict, pr_id: int, gh_src_id: int, platform_id: int, repo_id: int, tool_source: str, tool_version: str, data_source: str) -> dict: pr_event = { 'pull_request_id': pr_id, @@ -291,13 +291,13 @@ def extract_pr_event_data(event: dict, pr_id: int, platform_id: int, repo_id: in 'action': event['event'], 'action_commit_hash': None, 'created_at': event['created_at'], - 'issue_event_src_id': int(event['issue']["id"]), + 'issue_event_src_id': gh_src_id, 'node_id': event['node_id'], 'node_url': event['url'], 'tool_source': tool_source, 'tool_version': tool_version, 'data_source': data_source, - 'pr_platform_event_id': int(event['issue']["id"]), + 'pr_platform_event_id': gh_src_id, 'platform_id': platform_id, 'repo_id': repo_id } diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 7a5fa6236..00789a342 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -2,11 +2,12 @@ import traceback import sqlalchemy as s from sqlalchemy.sql import text +from abc import ABC, abstractmethod from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask from augur.application.db.data_parse import * -from augur.tasks.github.util.github_data_access import GithubDataAccess +from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth from augur.tasks.github.util.util import get_owner_repo from augur.tasks.util.worker_util import remove_duplicate_dicts @@ -21,9 +22,6 @@ def collect_events(repo_git: str): logger = logging.getLogger(collect_events.__name__) - repo_obj = get_repo_by_repo_git(repo_git) - repo_id = repo_obj.repo_id - owner, repo = get_owner_repo(repo_git) logger.debug(f"Collecting Github events for {owner}/{repo}") @@ -31,22 +29,11 @@ def collect_events(repo_git: str): key_auth = GithubRandomKeyAuth(logger) if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): - event_generator = bulk_collect_pr_and_issue_events(repo_git, logger, key_auth) + collection_strategy = BulkGithubEventCollection(logger) else: - event_generator = collect_pr_and_issues_events_by_number(repo_id, repo_git, logger, key_auth, f"{owner}/{repo}: Event task") - - events = [] - for event in event_generator: - events.append(event) - - # making this a decent size since process_events retrieves all the issues and prs each time - if len(events) >= 500: - process_events(events, f"{owner}/{repo}: Event task", repo_id, logger) - events.clear() - - if events: - process_events(events, f"{owner}/{repo}: Event task", repo_id, logger) + collection_strategy = ThoroughGithubEventCollection(logger) + collection_strategy.collect(repo_git, key_auth) def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): @@ -61,164 +48,304 @@ def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, r return page_count != 300 +class NotMappableException(Exception): + pass -def bulk_collect_pr_and_issue_events(repo_git: str, logger, key_auth): +class GithubEventCollection(ABC): + + def __init__(self, logger): + self._logger = logger + self._tool_source = "Github events task" + self._tool_version = "2.0" + self._data_source = "Github API" - owner, repo = get_owner_repo(repo_git) + @abstractmethod + def collect(self, repo_git, key_auth): + pass - logger.debug(f"Collecting Github events for {owner}/{repo}") + def _insert_issue_events(self, events): + issue_event_natural_keys = ["issue_id", "issue_event_src_id"] + bulk_insert_dicts(self._logger, events, IssueEvent, issue_event_natural_keys) - url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" - - github_data_access = GithubDataAccess(key_auth, logger) + def _insert_pr_events(self, events): + pr_event_natural_keys = ["node_id"] + bulk_insert_dicts(self._logger, events, PullRequestEvent, pr_event_natural_keys) - return github_data_access.paginate_resource(url) + def _insert_contributors(self, contributors): + bulk_insert_dicts(self._logger, contributors, Contributor, ["cntrb_id"]) + def _process_github_event_contributors(self, event): -def collect_pr_and_issues_events_by_number(repo_id, repo_git: str, logger, key_auth, task_name) -> None: + if event["actor"]: - owner, repo = get_owner_repo(repo_git) + event_cntrb = extract_needed_contributor_data(event["actor"], self._tool_source, self._tool_version, self._data_source) + event["cntrb_id"] = event_cntrb["cntrb_id"] - # define logger for task - logger.debug(f"Collecting github events for {owner}/{repo}") + else: + event["cntrb_id"] = None + return event, None + + return event, event_cntrb - engine = get_engine() +class BulkGithubEventCollection(GithubEventCollection): - with engine.connect() as connection: + def __init__(self, logger): - query = text(f""" - (select pr_src_number as number from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc) - UNION - (select gh_issue_number as number from issues WHERE repo_id={repo_id} order by created_at desc); - """) + self.task_name = f"Bulk Github Event task" + self.repo_identifier = "" - result = connection.execute(query).fetchall() - numbers = [x[0] for x in result] - github_data_access = GithubDataAccess(key_auth, logger) - for number in numbers: + super().__init__(logger) - event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{number}/events" - - yield from github_data_access.paginate_resource(event_url) + def collect(self, repo_git, key_auth): + + repo_obj = get_repo_by_repo_git(repo_git) + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + self.repo_identifier = f"{owner}/{repo}" -def process_events(events, task_name, repo_id, logger): + events = [] + for event in self._collect_events(repo_git, key_auth): + events.append(event) + + # making this a decent size since process_events retrieves all the issues and prs each time + if len(events) >= 500: + self._process_events(events, repo_id) + events.clear() - tool_source = "Github events task" - tool_version = "2.0" - data_source = "Github API" - - pr_event_dicts = [] - issue_event_dicts = [] - contributors = [] - - - # create mapping from issue url to issue id of current issues - issue_url_to_id_map = {} - issues = get_issues_by_repo_id(repo_id) - for issue in issues: - issue_url_to_id_map[issue.issue_url] = issue.issue_id - - # create mapping from pr url to pr id of current pull requests - pr_url_to_id_map = {} - prs = get_pull_requests_by_repo_id(repo_id) - for pr in prs: - pr_url_to_id_map[pr.pr_url] = pr.pull_request_id - - not_mapable_event_count = 0 - event_len = len(events) - for event in events: - - event, contributor = process_github_event_contributors(logger, event, tool_source, tool_version, data_source) - #logger.info(f'This is the event pack: {event}') - # event_mapping_data is the pr or issue data needed to relate the event to an issue or pr + if events: + self._process_events(events, repo_id) - if 'issue' in event: - if event["issue"] is not None: - event_mapping_data = event["issue"] - else: - event_mapping_data = None - logger.warning(f'issue is not a value in event JSON: {event}') + def _collect_events(self, repo_git: str, key_auth): + + owner, repo = get_owner_repo(repo_git) + + url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" + + github_data_access = GithubDataAccess(key_auth, self._logger) - if event_mapping_data is None: - not_mapable_event_count += 1 - continue + return github_data_access.paginate_resource(url) - pull_request = event_mapping_data.get('pull_request', None) - if pull_request: - pr_url = pull_request["url"] + def _process_events(self, events, repo_id): + + issue_events = [] + pr_events = [] + not_mappable_events = [] + for event in events: try: - pull_request_id = pr_url_to_id_map[pr_url] + if self._is_pr_event(event): + pr_events.append(event) + else: + issue_events.append(event) + except NotMappableException: + not_mappable_events.append(event) - # query = augur_db.session.query(PullRequest).filter(PullRequest.pr_url == pr_url) - # related_pr = execute_session_query(query, 'one') - except KeyError: - logger.warning(f"{task_name}: Could not find related pr. We were searching for: {pr_url}") - continue + if not_mappable_events: + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") - pr_event_dicts.append( - extract_pr_event_data(event, pull_request_id, platform_id, repo_id, - tool_source, tool_version, data_source) - ) + self._process_issue_events(issue_events, repo_id) + self._process_pr_events(pr_events, repo_id) - else: - issue_url = event_mapping_data["url"] + update_issue_closed_cntrbs_by_repo_id(repo_id) + + def _process_issue_events(self, issue_events, repo_id): + + issue_event_dicts = [] + contributors = [] + + issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) + + for event in issue_events: + + event, contributor = self._process_github_event_contributors(event) + + issue_url = event["issue"]["url"] try: issue_id = issue_url_to_id_map[issue_url] - # query = augur_db.session.query(Issue).filter(Issue.issue_url == issue_url) - # related_issue = execute_session_query(query, 'one') except KeyError: - logger.warning(f"{task_name}: Could not find related issue. We were searching for: {issue_url}") + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related issue. We were searching for: {issue_url}") continue issue_event_dicts.append( extract_issue_event_data(event, issue_id, platform_id, repo_id, - tool_source, tool_version, data_source) + self._tool_source, self._tool_version, self._data_source) ) - - # add contributor to list after porcessing the event, - # so if it fails processing for some reason the contributor is not inserted - # NOTE: contributor is none when there is no contributor data on the event - if contributor: - contributors.append(contributor) - # remove contributors that were found in the data more than once - contributors = remove_duplicate_dicts(contributors) + if contributor: + contributors.append(contributor) + + contributors = remove_duplicate_dicts(contributors) + + self._insert_contributors(contributors) - bulk_insert_dicts(logger, contributors, Contributor, ["cntrb_id"]) + self._insert_issue_events(issue_event_dicts) - issue_events_len = len(issue_event_dicts) - pr_events_len = len(pr_event_dicts) - if event_len != (issue_events_len + pr_events_len): + def _process_pr_events(self, pr_events, repo_id): + + pr_event_dicts = [] + contributors = [] - unassigned_events = event_len - issue_events_len - pr_events_len + pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) - logger.error(f"{task_name}: {event_len} events were processed, but {pr_events_len} pr events were found and related to a pr, and {issue_events_len} issue events were found and related to an issue. {not_mapable_event_count} events were not related to a pr or issue due to the api returning insufficient data. For some reason {unassigned_events} events were not able to be processed even when the api returned sufficient data. This is usually because pull requests or issues have not been collected, and the events are skipped because they cannot be related to a pr or issue") + for event in pr_events: - logger.info(f"{task_name}: Inserting {len(pr_event_dicts)} pr events and {len(issue_event_dicts)} issue events") + event, contributor = self._process_github_event_contributors(event) - # TODO: Could replace this with "id" but it isn't stored on the table for some reason - pr_event_natural_keys = ["node_id"] - bulk_insert_dicts(logger, pr_event_dicts, PullRequestEvent, pr_event_natural_keys) + pr_url = event["issue"]["pull_request"]["url"] - issue_event_natural_keys = ["issue_id", "issue_event_src_id"] - bulk_insert_dicts(logger, issue_event_dicts, IssueEvent, issue_event_natural_keys) + try: + pull_request_id = pr_url_to_id_map[pr_url] + except KeyError: + self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Could not find related pr. We were searching for: {pr_url}") + continue - update_issue_closed_cntrbs_by_repo_id(repo_id) + pr_event_dicts.append( + extract_pr_event_data(event, pull_request_id, int(event['issue']["id"]), platform_id, repo_id, + self._tool_source, self._tool_version, self._data_source) + ) -# TODO: Should we skip an event if there is no contributor to resolve it o -def process_github_event_contributors(logger, event, tool_source, tool_version, data_source): + if contributor: + contributors.append(contributor) - if event["actor"]: + contributors = remove_duplicate_dicts(contributors) - event_cntrb = extract_needed_contributor_data(event["actor"], tool_source, tool_version, data_source) - event["cntrb_id"] = event_cntrb["cntrb_id"] + self._insert_contributors(contributors) - else: - event["cntrb_id"] = None - return event, None + self._insert_pr_events(pr_event_dicts) + + def _get_map_from_pr_url_to_id(self, repo_id): + + pr_url_to_id_map = {} + prs = get_pull_requests_by_repo_id(repo_id) + for pr in prs: + pr_url_to_id_map[pr.pr_url] = pr.pull_request_id + + return pr_url_to_id_map - return event, event_cntrb + def _get_map_from_issue_url_to_id(self, repo_id): + + issue_url_to_id_map = {} + issues = get_issues_by_repo_id(repo_id) + for issue in issues: + issue_url_to_id_map[issue.issue_url] = issue.issue_id + + return issue_url_to_id_map + + def _is_pr_event(self, event): + + if event["issue"] is None: + raise NotMappableException("Not mappable to pr or issue") + + return event["issue"].get('pull_request', None) != None + +class ThoroughGithubEventCollection(GithubEventCollection): + + def __init__(self, logger): + super().__init__(logger) + + def collect(self, repo_git, key_auth): + + repo_obj = get_repo_by_repo_git(repo_git) + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + self.repo_identifier = f"{owner}/{repo}" + + self._collect_and_process_issue_events(owner, repo, repo_id, key_auth) + self._collect_and_process_pr_events(owner, repo, repo_id, key_auth) + + def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): + + engine = get_engine() + + with engine.connect() as connection: + + # TODO: Remove src id if it ends up not being needed + query = text(f""" + select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc; + """) + + issue_result = connection.execute(query).fetchall() + + events = [] + contributors = [] + github_data_access = GithubDataAccess(key_auth, self._logger) + for db_issue in issue_result: + issue = db_issue._asdict() + + issue_number = issue["issue_number"] + event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/events" + + try: + for event in github_data_access.paginate_resource(event_url): + + event, contributor = self._process_github_event_contributors(event) + + if contributor: + contributors.append(contributor) + + events.append( + extract_issue_event_data(event, issue["issue_id"], platform_id, repo_id, + self._tool_source, self._tool_version, self._data_source) + ) + except UrlNotFoundException as e: + self._logger.warning(f"{self.repo_identifier}: Url not found for {event_url}") + + if len(events) > 500: + self._insert_contributors(contributors) + self._insert_issue_events(events) + events.clear() + + if events: + self._insert_contributors(contributors) + self._insert_issue_events(events) + events.clear() + + + def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): + + engine = get_engine() + + with engine.connect() as connection: + + query = text(f""" + select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests order by pr_created_at desc; from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc; + """) + + pr_result = connection.execute(query).fetchall() + + events = [] + contributors = [] + github_data_access = GithubDataAccess(key_auth, self._logger) + for db_pr in pr_result: + pr = db_pr._asdict() + + pr_number = pr["gh_pr_number"] + + event_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{pr_number}/events" + + for event in github_data_access.paginate_resource(event_url): + + event, contributor = self._process_github_event_contributors(event) + + if contributor: + contributors.append(contributor) + + events.append( + extract_pr_event_data(event, pr["pull_request_id"], pr["pr_src_id"] , platform_id, repo_id, + self._tool_source, self._tool_version, self._data_source) + ) + + if len(events) > 500: + self._insert_contributors(contributors) + self._insert_pr_events(events) + events.clear() + + if events: + self._insert_contributors(contributors) + self._insert_pr_events(events) + events.clear()