-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Evaluate new things for key rotation #3002
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
from sqlalchemy.exc import OperationalError | ||
|
||
|
||
def test_connection(function_internet_connection): | ||
""" def test_connection(function_internet_connection): | ||
@click.pass_context | ||
def new_func(ctx, *args, **kwargs): | ||
usage = re.search(r"Usage:\s(.*)\s\[OPTIONS\]", str(ctx.get_usage())).groups()[0] | ||
|
@@ -34,7 +34,39 @@ def new_func(ctx, *args, **kwargs): | |
Consider setting http_proxy variables for limited access installations.") | ||
sys.exit(-1) | ||
|
||
return update_wrapper(new_func, function_internet_connection) | ||
return update_wrapper(new_func, function_internet_connection) """ | ||
|
||
import socket | ||
import sys | ||
import functools | ||
import click | ||
|
||
def has_internet(host="8.8.8.8", port=53, timeout=3): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the important part, which changes what we check from a site URL that may have some volatility from time to time to a DNS Nameserver that has the highest uptime requirements of anything on the internet. |
||
"""Check internet connectivity by attempting a socket connection to a public DNS.""" | ||
try: | ||
socket.setdefaulttimeout(timeout) | ||
socket.create_connection((host, port)) | ||
return True | ||
except (socket.timeout, socket.gaierror, socket.error): | ||
return False | ||
|
||
def test_connection(func): | ||
"""Decorator to check internet before executing a function.""" | ||
@functools.wraps(func) | ||
@click.pass_context | ||
def wrapper(ctx, *args, **kwargs): | ||
if not has_internet(): | ||
usage = ctx.get_usage().strip() | ||
print(f"\n\n{usage}\nCommand setup failed.\n" | ||
"You are not connected to the internet.\n" | ||
"Please connect to proceed.\n" | ||
"Consider setting http_proxy variables if on a restricted network.") | ||
sys.exit(-1) | ||
return ctx.invoke(func, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
### end modification | ||
|
||
def test_db_connection(function_db_connection): | ||
@click.pass_context | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,9 +10,16 @@ | |
from augur.application.db.util import execute_session_query | ||
from augur.application.db.lib import bulk_insert_dicts, get_session | ||
from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth | ||
|
||
|
||
|
||
import time | ||
import logging | ||
import random | ||
from sqlalchemy.exc import OperationalError | ||
from psycopg2.errors import DeadlockDetected | ||
from sqlalchemy.orm import sessionmaker | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
from my_project.database import get_session # Adjust import as needed | ||
from my_project.models import Contributor # Adjust import as needed | ||
|
||
""" | ||
@celery.task | ||
def process_contributors(): | ||
|
||
|
@@ -64,8 +71,86 @@ | |
|
||
logger.info(f"Enriching {len(enriched_contributors)} contributors") | ||
bulk_insert_dicts(enriched_contributors, Contributor, ["cntrb_id"]) | ||
""" | ||
|
||
|
||
@celery.task | ||
def process_contributors(): | ||
logger = logging.getLogger(process_contributors.__name__) | ||
|
||
tool_source = "Contributors task" | ||
tool_version = "2.0" | ||
data_source = "Github API" | ||
|
||
key_auth = GithubRandomKeyAuth(logger) | ||
|
||
with get_session() as session: | ||
query = session.query(Contributor).filter( | ||
Contributor.data_source == data_source, | ||
Contributor.cntrb_created_at.is_(None), | ||
Contributor.cntrb_last_used.is_(None) | ||
) | ||
contributors = execute_session_query(query, 'all') | ||
|
||
contributors_len = len(contributors) | ||
|
||
if contributors_len == 0: | ||
logger.info("No contributors to enrich...returning...") | ||
return | ||
|
||
logger.info(f"Length of contributors to enrich: {contributors_len}") | ||
|
||
batch_size = 50 # Adjust batch size as needed | ||
max_retries = 5 | ||
enriched_contributors = [] | ||
|
||
for index, contributor in enumerate(contributors): | ||
logger.info(f"Processing Contributor {index + 1} of {contributors_len}") | ||
|
||
contributor_dict = contributor.__dict__ | ||
del contributor_dict["_sa_instance_state"] | ||
|
||
url = f"https://api.github.com/users/{contributor_dict['cntrb_login']}" | ||
data = retrieve_dict_data(url, key_auth, logger) | ||
|
||
if data is None: | ||
logger.warning(f"Unable to get contributor data for: {contributor_dict['cntrb_login']}") | ||
continue | ||
|
||
new_contributor_data = { | ||
"cntrb_created_at": data["created_at"], | ||
"cntrb_last_used": data["updated_at"] | ||
} | ||
|
||
contributor_dict.update(new_contributor_data) | ||
enriched_contributors.append(contributor_dict) | ||
|
||
# Process in batches to reduce deadlocks | ||
if len(enriched_contributors) >= batch_size: | ||
_insert_with_retries(enriched_contributors, max_retries, logger) | ||
enriched_contributors = [] | ||
|
||
# Insert remaining contributors | ||
if enriched_contributors: | ||
_insert_with_retries(enriched_contributors, max_retries, logger) | ||
|
||
def _insert_with_retries(contributors_batch, max_retries, logger): | ||
"""Handles deadlocks by retrying transactions with exponential backoff.""" | ||
retries = 0 | ||
while retries < max_retries: | ||
try: | ||
bulk_insert_dicts(contributors_batch, Contributor, ["cntrb_id"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
return # Exit function if successful | ||
except (OperationalError, DeadlockDetected) as e: | ||
wait_time = 2 ** retries + random.uniform(0, 1) # Exponential backoff | ||
logger.warning(f"Deadlock detected, retrying in {wait_time:.2f} seconds... (Attempt {retries + 1}/{max_retries})") | ||
time.sleep(wait_time) | ||
retries += 1 | ||
except Exception as e: | ||
logger.error(f"Unexpected error during batch insert: {e}") | ||
break # Exit on non-deadlock errors | ||
|
||
logger.error("Max retries reached. Some records may not have been inserted.") | ||
|
||
def retrieve_dict_data(url: str, key_auth, logger): | ||
|
||
|
@@ -84,7 +169,7 @@ | |
|
||
if "message" in page_data: | ||
|
||
if page_data['message'] == "Not Found": | ||
Check warning on line 172 in augur/tasks/github/contributors.py
|
||
logger.info( | ||
"Github repo was not found or does not exist for endpoint: " | ||
f"{response.url}\n" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[pylint] reported by reviewdog 🐶
E0001: Parsing failed: 'unexpected character after line continuation character (augur.application.cli.init, line 38)' (syntax-error)