Skip to content

Commit

Permalink
Get almost the entire results-processor typed (#4060)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsnedders authored Oct 16, 2024
1 parent 716c11b commit d6339c4
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 92 deletions.
16 changes: 10 additions & 6 deletions results-processor/gsutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,34 @@
import logging
import re
import subprocess

from typing import List, Tuple

_log = logging.getLogger(__name__)


def _call(command):
def _call(command: List[str]) -> None:
_log.info('EXEC: %s', ' '.join(command))
subprocess.check_call(command)


def split_gcs_path(gcs_path):
def split_gcs_path(gcs_path: str) -> Tuple[str, str]:
"""Splits /bucket/path into (bucket, path)."""
match = re.match(r'/([^/]+)/(.*)', gcs_path)
assert match
return match.groups()
g = match.groups()
assert len(g) == 2
return g


def gs_to_public_url(gs_url):
def gs_to_public_url(gs_url: str) -> str:
"""Converts a gs:// URI to a HTTP URL."""
assert gs_url.startswith('gs://')
return gs_url.replace('gs://', 'https://storage.googleapis.com/', 1)


def copy(path1, path2, gzipped=False, quiet=True):
def copy(
path1: str, path2: str, gzipped: bool = False, quiet: bool = True
) -> None:
"""Copies path1 to path2 with gsutil cp.
Args:
Expand Down
26 changes: 15 additions & 11 deletions results-processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import tempfile
import time
from http import HTTPStatus
from typing import Any, Callable, TypeVar, cast

import filelock
import flask
from flask.typing import ResponseReturnValue

import processor


# The file will be flock()'ed if a report is being processed.
LOCK_FILE = '/tmp/results-processor.lock'
# If the file above is locked, this timestamp file contains the UNIX timestamp
Expand All @@ -32,7 +33,7 @@
app = flask.Flask(__name__)


def _atomic_write(path, content):
def _atomic_write(path: str, content: str) -> None:
# Do not auto-delete the file because we will move it after closing it.
temp = tempfile.NamedTemporaryFile(mode='wt', delete=False)
temp.write(content)
Expand All @@ -41,12 +42,15 @@ def _atomic_write(path, content):
os.replace(temp.name, path)


def _serial_task(func):
F = TypeVar('F', bound=Callable[..., Any])


def _serial_task(func: F) -> F:
lock = filelock.FileLock(LOCK_FILE)

# It is important to use wraps() to preserve the original name & docstring.
@functools.wraps(func)
def decorated_func(*args, **kwargs):
def decorated_func(*args: object, **kwargs: object) -> object:
try:
with lock.acquire(timeout=1):
return func(*args, **kwargs)
Expand All @@ -55,24 +59,24 @@ def decorated_func(*args, **kwargs):
return ('A result is currently being processed.',
HTTPStatus.SERVICE_UNAVAILABLE)

return decorated_func
return cast(F, decorated_func)


def _internal_only(func):
def _internal_only(func: F) -> F:
@functools.wraps(func)
def decorated_func(*args, **kwargs):
def decorated_func(*args: object, **kwargs: object) -> object:
if (not app.debug and
# This header cannot be set by external requests.
# https://cloud.google.com/tasks/docs/creating-appengine-handlers?hl=en#reading_app_engine_task_request_headers
not flask.request.headers.get('X-AppEngine-QueueName')):
return ('External requests not allowed', HTTPStatus.FORBIDDEN)
return func(*args, **kwargs)

return decorated_func
return cast(F, decorated_func)


@app.route('/_ah/liveness_check')
def liveness_check():
def liveness_check() -> ResponseReturnValue:
lock = filelock.FileLock(LOCK_FILE)
try:
lock.acquire(timeout=0.1)
Expand All @@ -91,7 +95,7 @@ def liveness_check():


@app.route('/_ah/readiness_check')
def readiness_check():
def readiness_check() -> ResponseReturnValue:
lock = filelock.FileLock(LOCK_FILE)
try:
lock.acquire(timeout=0.1)
Expand All @@ -106,7 +110,7 @@ def readiness_check():
@app.route('/api/results/process', methods=['POST'])
@_internal_only
@_serial_task
def task_handler():
def task_handler() -> ResponseReturnValue:
_atomic_write(TIMESTAMP_FILE, str(time.time()))

task_id = flask.request.headers.get('X-AppEngine-TaskName')
Expand Down
27 changes: 14 additions & 13 deletions results-processor/mypy.ini
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
[mypy]
check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
disallow_subclassing_any = True
#disallow_any_generics = True
disallow_untyped_calls = True
#disallow_untyped_defs = True
disallow_incomplete_defs = True
#check_untyped_defs = True
disallow_untyped_decorators = True
no_implicit_optional = True
disallow_untyped_defs = True
extra_checks = True
no_implicit_reexport = True
strict_equality = True
warn_redundant_casts = True
warn_unused_ignores = True
warn_return_any = True
warn_unused_configs = True
warn_unused_ignores = True

[mypy-filelock]
ignore_missing_imports = True

[mypy-google.cloud]
ignore_missing_imports = True
exclude = (?x)(
^.*_test\.py$
| ^test_.*\.py$
)

[mypy-requests]
ignore_missing_imports = True
untyped_calls_exclude = google.cloud.datastore
Loading

0 comments on commit d6339c4

Please sign in to comment.