diff --git a/augur/application/cli/__init__.py b/augur/application/cli/__init__.py index 00f41a553..5afee22f8 100644 --- a/augur/application/cli/__init__.py +++ b/augur/application/cli/__init__.py @@ -12,7 +12,7 @@ from sqlalchemy.exc import OperationalError -def test_connection(function_internet_connection): +""" def test_connection(function_internet_connection): @click.pass_context def new_func(ctx, *args, **kwargs): usage = re.search(r"Usage:\s(.*)\s\[OPTIONS\]", str(ctx.get_usage())).groups()[0] @@ -34,7 +34,39 @@ def new_func(ctx, *args, **kwargs): Consider setting http_proxy variables for limited access installations.") sys.exit(-1) - return update_wrapper(new_func, function_internet_connection) + return update_wrapper(new_func, function_internet_connection) """ + +import socket +import sys +import functools +import click + +def has_internet(host="8.8.8.8", port=53, timeout=3): + """Check internet connectivity by attempting a socket connection to a public DNS.""" + try: + socket.setdefaulttimeout(timeout) + socket.create_connection((host, port)) + return True + except (socket.timeout, socket.gaierror, socket.error): + return False + +def test_connection(func): + """Decorator to check internet before executing a function.""" + @functools.wraps(func) + @click.pass_context + def wrapper(ctx, *args, **kwargs): + if not has_internet(): + usage = ctx.get_usage().strip() + print(f"\n\n{usage}\nCommand setup failed.\n" + "You are not connected to the internet.\n" + "Please connect to proceed.\n" + "Consider setting http_proxy variables if on a restricted network.") + sys.exit(-1) + return ctx.invoke(func, *args, **kwargs) + + return wrapper + +### end modification def test_db_connection(function_db_connection): @click.pass_context diff --git a/augur/tasks/github/contributors.py b/augur/tasks/github/contributors.py index f4fa7165d..cbac5d559 100644 --- a/augur/tasks/github/contributors.py +++ b/augur/tasks/github/contributors.py @@ -10,9 +10,16 @@ from augur.application.db.util import execute_session_query from augur.application.db.lib import bulk_insert_dicts, get_session from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth - - - +import time +import logging +import random +from sqlalchemy.exc import OperationalError +from psycopg2.errors import DeadlockDetected +from sqlalchemy.orm import sessionmaker +from my_project.database import get_session # Adjust import as needed +from my_project.models import Contributor # Adjust import as needed + +""" @celery.task def process_contributors(): @@ -64,8 +71,86 @@ def process_contributors(): logger.info(f"Enriching {len(enriched_contributors)} contributors") bulk_insert_dicts(enriched_contributors, Contributor, ["cntrb_id"]) + """ + + +@celery.task +def process_contributors(): + logger = logging.getLogger(process_contributors.__name__) + + tool_source = "Contributors task" + tool_version = "2.0" + data_source = "Github API" + + key_auth = GithubRandomKeyAuth(logger) + + with get_session() as session: + query = session.query(Contributor).filter( + Contributor.data_source == data_source, + Contributor.cntrb_created_at.is_(None), + Contributor.cntrb_last_used.is_(None) + ) + contributors = execute_session_query(query, 'all') + + contributors_len = len(contributors) + + if contributors_len == 0: + logger.info("No contributors to enrich...returning...") + return + + logger.info(f"Length of contributors to enrich: {contributors_len}") + + batch_size = 50 # Adjust batch size as needed + max_retries = 5 + enriched_contributors = [] + + for index, contributor in enumerate(contributors): + logger.info(f"Processing Contributor {index + 1} of {contributors_len}") + + contributor_dict = contributor.__dict__ + del contributor_dict["_sa_instance_state"] + url = f"https://api.github.com/users/{contributor_dict['cntrb_login']}" + data = retrieve_dict_data(url, key_auth, logger) + + if data is None: + logger.warning(f"Unable to get contributor data for: {contributor_dict['cntrb_login']}") + continue + + new_contributor_data = { + "cntrb_created_at": data["created_at"], + "cntrb_last_used": data["updated_at"] + } + + contributor_dict.update(new_contributor_data) + enriched_contributors.append(contributor_dict) + # Process in batches to reduce deadlocks + if len(enriched_contributors) >= batch_size: + _insert_with_retries(enriched_contributors, max_retries, logger) + enriched_contributors = [] + + # Insert remaining contributors + if enriched_contributors: + _insert_with_retries(enriched_contributors, max_retries, logger) + +def _insert_with_retries(contributors_batch, max_retries, logger): + """Handles deadlocks by retrying transactions with exponential backoff.""" + retries = 0 + while retries < max_retries: + try: + bulk_insert_dicts(contributors_batch, Contributor, ["cntrb_id"]) + return # Exit function if successful + except (OperationalError, DeadlockDetected) as e: + wait_time = 2 ** retries + random.uniform(0, 1) # Exponential backoff + logger.warning(f"Deadlock detected, retrying in {wait_time:.2f} seconds... (Attempt {retries + 1}/{max_retries})") + time.sleep(wait_time) + retries += 1 + except Exception as e: + logger.error(f"Unexpected error during batch insert: {e}") + break # Exit on non-deadlock errors + + logger.error("Max retries reached. Some records may not have been inserted.") def retrieve_dict_data(url: str, key_auth, logger):