Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import cornice tests #3506

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 1 addition & 64 deletions kinto/core/cornice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import logging
from functools import partial

from pyramid.events import NewRequest
from pyramid.httpexceptions import HTTPForbidden, HTTPNotFound
from pyramid.security import NO_PERMISSION_REQUIRED
from pyramid.settings import asbool, aslist

from kinto.core.cornice.errors import Errors # NOQA
from kinto.core.cornice.pyramidhook import (
handle_exceptions,
register_resource_views,
register_service_views,
wrap_request,
)
Expand All @@ -21,73 +15,16 @@
from kinto.core.cornice.util import ContentTypePredicate, current_service


logger = logging.getLogger("cornice")


def set_localizer_for_languages(event, available_languages, default_locale_name):
"""
Sets the current locale based on the incoming Accept-Language header, if
present, and sets a localizer attribute on the request object based on
the current locale.

To be used as an event handler, this function needs to be partially applied
with the available_languages and default_locale_name arguments. The
resulting function will be an event handler which takes an event object as
its only argument.
"""
request = event.request
if request.accept_language:
accepted = request.accept_language.lookup(available_languages, default=default_locale_name)
request._LOCALE_ = accepted


def setup_localization(config):
"""
Setup localization based on the available_languages and
pyramid.default_locale_name settings.

These settings are named after suggestions from the "Internationalization
and Localization" section of the Pyramid documentation.
"""
try:
config.add_translation_dirs("colander:locale/")
settings = config.get_settings()
available_languages = aslist(settings["available_languages"])
default_locale_name = settings.get("pyramid.default_locale_name", "en")
set_localizer = partial(
set_localizer_for_languages,
available_languages=available_languages,
default_locale_name=default_locale_name,
)
config.add_subscriber(set_localizer, NewRequest)
except ImportError: # pragma: no cover
# add_translation_dirs raises an ImportError if colander is not
# installed
pass
logger = logging.getLogger("kinto.core.cornice")


def includeme(config):
"""Include the Cornice definitions"""
# attributes required to maintain services
config.registry.cornice_services = {}

settings = config.get_settings()

# localization request subscriber must be set before first call
# for request.localizer (in wrap_request)
if settings.get("available_languages"):
setup_localization(config)

config.add_directive("add_cornice_service", register_service_views)
config.add_directive("add_cornice_resource", register_resource_views)
config.add_subscriber(wrap_request, NewRequest)
config.add_renderer("cornicejson", CorniceRenderer())
config.add_view_predicate("content_type", ContentTypePredicate)
config.add_request_method(current_service, reify=True)

if asbool(settings.get("handle_exceptions", True)):
config.add_view(handle_exceptions, context=Exception, permission=NO_PERMISSION_REQUIRED)
config.add_view(handle_exceptions, context=HTTPNotFound, permission=NO_PERMISSION_REQUIRED)
config.add_view(
handle_exceptions, context=HTTPForbidden, permission=NO_PERMISSION_REQUIRED
)
9 changes: 1 addition & 8 deletions kinto/core/cornice/cors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
import fnmatch
import functools

from pyramid.settings import asbool


CORS_PARAMETERS = (
"cors_headers",
"cors_enabled",
"cors_origins",
"cors_credentials",
"cors_max_age",
Expand Down Expand Up @@ -95,11 +92,7 @@ def ensure_origin(service, request, response=None, **kwargs):
origin = request.headers.get("Origin")

if not origin:
always_cors = asbool(request.registry.settings.get("cornice.always_cors"))
# With this setting, if the service origins has "*", then
# always return CORS headers.
origins = getattr(service, "cors_origins", [])
if always_cors and "*" in origins:
if "*" in service.cors_origins:
origin = "*"

if origin:
Expand Down
26 changes: 2 additions & 24 deletions kinto/core/cornice/errors.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,18 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import json

from pyramid.i18n import TranslationString


class Errors(list):
"""Holds Request errors"""

def __init__(self, status=400, localizer=None):
def __init__(self, status=400):
self.status = status
self.localizer = localizer
super(Errors, self).__init__()
super().__init__()

def add(self, location, name=None, description=None, **kw):
"""Registers a new error."""
allowed = ("body", "querystring", "url", "header", "path", "cookies", "method")
if location != "" and location not in allowed:
raise ValueError("%r not in %s" % (location, allowed))

if isinstance(description, TranslationString) and self.localizer:
description = self.localizer.translate(description)

self.append(dict(location=location, name=name, description=description, **kw))

@classmethod
def from_json(cls, string):
"""Transforms a json string into an `Errors` instance"""
obj = json.loads(string.decode())
return Errors.from_list(obj.get("errors", []))

@classmethod
def from_list(cls, obj):
"""Transforms a python list into an `Errors` instance"""
errors = Errors()
for error in obj:
errors.add(**error)
return errors
46 changes: 4 additions & 42 deletions kinto/core/cornice/pyramidhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from pyramid.exceptions import PredicateMismatch
from pyramid.httpexceptions import (
HTTPException,
HTTPMethodNotAllowed,
HTTPNotAcceptable,
HTTPUnsupportedMediaType,
Expand All @@ -25,7 +24,6 @@
from kinto.core.cornice.util import (
content_type_matches,
current_service,
is_string,
match_accept_header,
match_content_type_header,
to_list,
Expand Down Expand Up @@ -107,32 +105,13 @@ def _fallback_view(request):

def apply_filters(request, response):
if request.matched_route is not None:
# do some sanity checking on the response using filters
# do some sanity checking on the response
service = current_service(request)
if service is not None:
kwargs, ob = getattr(request, "cornice_args", ({}, None))
for _filter in kwargs.get("filters", []):
if is_string(_filter) and ob is not None:
_filter = getattr(ob, _filter)
try:
response = _filter(response, request)
except TypeError:
response = _filter(response)
if service.cors_enabled:
apply_cors_post_request(service, request, response)

apply_cors_post_request(service, request, response)
return response


def handle_exceptions(exc, request):
# At this stage, the checks done by the validators had been removed because
# a new response started (the exception), so we need to do that again.
if not isinstance(exc, HTTPException):
raise
request.info["cors_checked"] = False
return apply_filters(request, exc)


def add_nosniff_header(request, response):
"""IE has some rather unfortunately content-type-sniffing behaviour
that can be used to trigger XSS attacks via a JSON API, as described here:
Expand Down Expand Up @@ -184,7 +163,7 @@ def register_service_views(config, service):

# before doing anything else, register a view for the OPTIONS method
# if we need to
if service.cors_enabled and "OPTIONS" not in service.defined_methods:
if "OPTIONS" not in service.defined_methods:
service.add_view(
"options", view=get_cors_preflight_view(service), permission=NO_PERMISSION_REQUIRED
)
Expand All @@ -195,7 +174,6 @@ def register_service_views(config, service):

# Cornice-specific arguments that pyramid does not know about
cornice_parameters = (
"filters",
"validators",
"schema",
"klass",
Expand Down Expand Up @@ -237,9 +215,7 @@ def register_service_views(config, service):
args[item] = copy.deepcopy(args[item])

args["request_method"] = method

if service.cors_enabled:
args["validators"].insert(0, cors_validator)
args["validators"].insert(0, cors_validator)

decorated_view = decorate_view(view, dict(args), method, route_args)

Expand Down Expand Up @@ -357,17 +333,3 @@ def _mungle_view_args(args, predicate_list):
else:
# otherwise argument value is just a scalar
args[kind] = value


def register_resource_views(config, resource):
"""Register a resource and it's views.

:param config:
The pyramid configuration object that will be populated.
:param resource:
The resource class containing the definitions
"""
services = resource._services

for service in services.values():
config.add_cornice_service(service)
17 changes: 8 additions & 9 deletions kinto/core/cornice/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@
from pyramid.response import Response


def bytes_adapter(obj, request):
"""Convert bytes objects to strings for json error renderer."""
if isinstance(obj, bytes):
return obj.decode("utf8")
return obj


class JSONError(exc.HTTPError):
def __init__(self, serializer, serializer_kw, errors, status=400):
body = {"status": "error", "errors": errors}
Expand All @@ -18,6 +11,13 @@ def __init__(self, serializer, serializer_kw, errors, status=400):
self.content_type = "application/json"


def bytes_adapter(obj, request):
"""Convert bytes objects to strings for json error renderer."""
if isinstance(obj, bytes):
return obj.decode("utf8")
return obj


class CorniceRenderer(JSON):
"""We implement JSON serialization by extending Pyramid's default
JSON rendering machinery using our own custom Content-Type logic `[1]`_.
Expand All @@ -34,8 +34,7 @@ class CorniceRenderer(JSON):
acceptable = ("application/json", "text/plain")

def __init__(self, *args, **kwargs):
"""Adds a `bytes` adapter by default."""
super(CorniceRenderer, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.add_adapter(bytes, bytes_adapter)

def render_errors(self, request):
Expand Down
Loading
Loading