Source code for schemathesis.models

import base64
import datetime
import http
from collections import Counter
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from itertools import chain
from logging import LogRecord
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generator,
    Generic,
    Iterator,
    List,
    NoReturn,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypeVar,
    Union,
    cast,
)
from urllib.parse import quote, unquote, urljoin, urlparse, urlsplit, urlunsplit
from uuid import uuid4

import curlify
import requests.auth
import werkzeug
from hypothesis import event, note, reject
from hypothesis import strategies as st
from requests.structures import CaseInsensitiveDict
from starlette_testclient import TestClient as ASGIClient

from . import failures, serializers
from .auths import AuthStorage
from .constants import (
    DEFAULT_RESPONSE_TIMEOUT,
    SCHEMATHESIS_TEST_CASE_HEADER,
    SERIALIZERS_SUGGESTION_MESSAGE,
    USER_AGENT,
    CodeSampleStyle,
    DataGenerationMethod,
)
from .exceptions import (
    CheckFailed,
    FailureContext,
    InvalidSchema,
    SerializationNotPossible,
    deduplicate_failed_checks,
    get_grouped_exception,
    get_timeout_error,
)
from .hooks import GLOBAL_HOOK_DISPATCHER, HookContext, HookDispatcher, dispatch
from .parameters import Parameter, ParameterSet, PayloadAlternatives
from .serializers import Serializer, SerializerContext
from .types import Body, Cookies, FormData, Headers, NotSet, PathParameters, Query
from .utils import (
    IGNORED_HEADERS,
    NOT_SET,
    GenericResponse,
    WSGIResponse,
    copy_response,
    deprecated_property,
    fast_deepcopy,
    get_response_payload,
    maybe_set_assertion_message,
)

if TYPE_CHECKING:
    from .schemas import BaseSchema
    from .stateful import Stateful, StatefulTest


@dataclass
class CaseSource:
    """Data sources, used to generate a test case."""

    case: "Case"
    response: GenericResponse
    elapsed: float

    def partial_deepcopy(self) -> "CaseSource":
        return self.__class__(
            case=self.case.partial_deepcopy(), response=copy_response(self.response), elapsed=self.elapsed
        )


def cant_serialize(media_type: str) -> NoReturn:  # type: ignore
    """Reject the current example if we don't know how to send this data to the application."""
    event_text = f"Can't serialize data to `{media_type}`."
    note(f"{event_text} {SERIALIZERS_SUGGESTION_MESSAGE}")
    event(event_text)
    reject()  # type: ignore


@dataclass(repr=False)
class Case:
    """A single test case parameters."""

    operation: "APIOperation"
    path_parameters: Optional[PathParameters] = None
    headers: Optional[CaseInsensitiveDict] = None
    cookies: Optional[Cookies] = None
    query: Optional[Query] = None
    # By default, there is no body, but we can't use `None` as the default value because it clashes with `null`
    # which is a valid payload.
    body: Union[Body, NotSet] = NOT_SET

    source: Optional[CaseSource] = None
    # The media type for cases with a payload. For example, "application/json"
    media_type: Optional[str] = None
    # The way the case was generated (None for manually crafted ones)
    data_generation_method: Optional[DataGenerationMethod] = None
    # Unique test case identifier
    id: str = field(default_factory=lambda: uuid4().hex, compare=False)
    _auth: Optional[requests.auth.AuthBase] = None

    def __repr__(self) -> str:
        parts = [f"{self.__class__.__name__}("]
        first = True
        for name in ("path_parameters", "headers", "cookies", "query", "body"):
            value = getattr(self, name)
            if value is not None and not isinstance(value, NotSet):
                if first:
                    first = False
                else:
                    parts.append(", ")
                parts.extend((name, "=", repr(value)))
        return "".join(parts) + ")"

    def __hash__(self) -> int:
        return hash(self.as_curl_command({SCHEMATHESIS_TEST_CASE_HEADER: "0"}))

    @deprecated_property(removed_in="4.0", replacement="operation")
    def endpoint(self) -> "APIOperation":
        return self.operation

    @property
    def path(self) -> str:
        return self.operation.path

    @property
    def full_path(self) -> str:
        return self.operation.full_path

    @property
    def method(self) -> str:
        return self.operation.method.upper()

    @property
    def base_url(self) -> Optional[str]:
        return self.operation.base_url

    @property
    def app(self) -> Any:
        return self.operation.app

    def set_source(self, response: GenericResponse, case: "Case", elapsed: float) -> None:
        self.source = CaseSource(case=case, response=response, elapsed=elapsed)

    @property
    def formatted_path(self) -> str:
        try:
            return self.path.format(**self.path_parameters or {})
        except KeyError as exc:
            # This may happen when a path template has a placeholder for variable "X", but parameter "X" is not defined
            # in the parameters list.
            # When `exc` is formatted, it is the missing key name in quotes. E.g. 'id'
            raise InvalidSchema(f"Path parameter {exc} is not defined") from exc
        except ValueError as exc:
            # A single unmatched `}` inside the path template may cause this
            raise InvalidSchema(f"Malformed path template: `{self.path}`\n\n  {exc}") from exc

    def get_full_base_url(self) -> Optional[str]:
        """Create a full base url, adding "localhost" for WSGI apps."""
        parts = urlsplit(self.base_url)
        if not parts.hostname:
            path = cast(str, parts.path or "")
            return urlunsplit(("http", "localhost", path or "", "", ""))
        return self.base_url

    def get_code_to_reproduce(
        self, headers: Optional[Dict[str, Any]] = None, request: Optional[requests.PreparedRequest] = None
    ) -> str:
        """Construct a Python code to reproduce this case with `requests`."""
        if request is not None:
            kwargs: Dict[str, Any] = {
                "method": request.method,
                "url": request.url,
                "headers": request.headers,
                "data": request.body,
            }
        else:
            base_url = self.get_full_base_url()
            kwargs = self.as_requests_kwargs(base_url)
        if headers:
            final_headers = kwargs["headers"] or {}
            final_headers.update(headers)
            kwargs["headers"] = final_headers
        kwargs["headers"] = {key: value for key, value in kwargs["headers"].items() if key not in IGNORED_HEADERS}
        method = kwargs["method"].lower()

        def should_display(key: str, value: Any) -> bool:
            if key in ("method", "url"):
                return False
            # Parameters are either absent because they are not defined or are optional
            return value not in (None, {})

        printed_kwargs = ", ".join(
            f"{key}={repr(value)}" for key, value in kwargs.items() if should_display(key, value)
        )
        url = _escape_single_quotes(kwargs["url"])
        args_repr = f"'{url}'"
        if printed_kwargs:
            args_repr += f", {printed_kwargs}"
        return f"requests.{method}({args_repr})"

    def as_curl_command(self, headers: Optional[Dict[str, Any]] = None) -> str:
        """Construct a curl command for a given case."""
        base_url = self.get_full_base_url()
        kwargs = self.as_requests_kwargs(base_url)
        if headers:
            final_headers = kwargs["headers"] or {}
            final_headers.update(headers)
            kwargs["headers"] = final_headers
        request = requests.Request(**kwargs)
        prepared = request.prepare()
        if isinstance(prepared.body, bytes):
            # Note, it may be not sufficient to reproduce the error :(
            prepared.body = prepared.body.decode("utf-8", errors="replace")
        for header in tuple(prepared.headers):
            if header in IGNORED_HEADERS and header not in (self.headers or {}):
                del prepared.headers[header]
        return curlify.to_curl(prepared)

    def _get_base_url(self, base_url: Optional[str] = None) -> str:
        if base_url is None:
            if self.base_url is not None:
                base_url = self.base_url
            else:
                raise ValueError(
                    "Base URL is required as `base_url` argument in `call` or should be specified "
                    "in the schema constructor as a part of Schema URL."
                )
        return base_url

    def _get_headers(self, headers: Optional[Dict[str, str]] = None) -> CaseInsensitiveDict:
        final_headers = self.headers.copy() if self.headers is not None else CaseInsensitiveDict()
        if headers:
            final_headers.update(headers)
        final_headers.setdefault("User-Agent", USER_AGENT)
        final_headers.setdefault(SCHEMATHESIS_TEST_CASE_HEADER, self.id)
        return final_headers

    def _get_serializer(self) -> Optional[Serializer]:
        """Get a serializer for the payload, if there is any."""
        if self.media_type is not None:
            media_type = serializers.get_first_matching_media_type(self.media_type)
            if media_type is None:
                # This media type is set manually. Otherwise, it should have been rejected during the data generation
                raise SerializationNotPossible.for_media_type(self.media_type)
            # SAFETY: It is safe to assume that serializer will be found, because `media_type` returned above
            # is registered. This intentionally ignores cases with concurrent serializers registry modification.
            cls = cast(Type[serializers.Serializer], serializers.get(media_type))
            return cls()
        return None

    def as_requests_kwargs(
        self, base_url: Optional[str] = None, headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Convert the case into a dictionary acceptable by requests."""
        final_headers = self._get_headers(headers)
        if self.media_type and self.media_type != "multipart/form-data" and not isinstance(self.body, NotSet):
            # `requests` will handle multipart form headers with the proper `boundary` value.
            if "content-type" not in {header.lower() for header in final_headers}:
                final_headers["Content-Type"] = self.media_type
        base_url = self._get_base_url(base_url)
        formatted_path = self.formatted_path.lstrip("/")
        if not base_url.endswith("/"):
            base_url += "/"
        url = unquote(urljoin(base_url, quote(formatted_path)))
        extra: Dict[str, Any]
        serializer = self._get_serializer()
        if serializer is not None and not isinstance(self.body, NotSet):
            context = SerializerContext(case=self)
            extra = serializer.as_requests(context, self.body)
        else:
            extra = {}
        if self._auth is not None:
            extra["auth"] = self._auth
        additional_headers = extra.pop("headers", None)
        if additional_headers:
            # Additional headers, needed for the serializer
            for key, value in additional_headers.items():
                final_headers.setdefault(key, value)
        return {
            "method": self.method,
            "url": url,
            "cookies": self.cookies,
            "headers": final_headers,
            "params": self.query,
            **extra,
        }

    def call(
        self,
        base_url: Optional[str] = None,
        session: Optional[requests.Session] = None,
        headers: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
        cookies: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> requests.Response:
        """Make a network call with `requests`."""
        hook_context = HookContext(operation=self.operation)
        dispatch("before_call", hook_context, self)
        data = self.as_requests_kwargs(base_url, headers)
        data.update(kwargs)
        if params is not None:
            _merge_dict_to(data, "params", params)
        if cookies is not None:
            _merge_dict_to(data, "cookies", cookies)
        data.setdefault("timeout", DEFAULT_RESPONSE_TIMEOUT / 1000)
        if session is None:
            validate_vanilla_requests_kwargs(data)
            session = requests.Session()
            close_session = True
        else:
            close_session = False
        try:
            with self.operation.schema.ratelimit():
                response = session.request(**data)  # type: ignore
        except requests.Timeout as exc:
            timeout = 1000 * data["timeout"]  # It is defined and not empty, since the exception happened
            request = cast(requests.PreparedRequest, exc.request)
            code_message = self._get_code_message(self.operation.schema.code_sample_style, request)
            raise get_timeout_error(timeout)(
                f"\n\n1. Request timed out after {timeout:.2f}ms\n\n----------\n\n{code_message}",
                context=failures.RequestTimeout(timeout=timeout),
            ) from None
        dispatch("after_call", hook_context, self, response)
        if close_session:
            session.close()
        return response

    def as_werkzeug_kwargs(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Convert the case into a dictionary acceptable by werkzeug.Client."""
        final_headers = self._get_headers(headers)
        if self.media_type and not isinstance(self.body, NotSet):
            # If we need to send a payload, then the Content-Type header should be set
            final_headers["Content-Type"] = self.media_type
        extra: Dict[str, Any]
        serializer = self._get_serializer()
        if serializer is not None and not isinstance(self.body, NotSet):
            context = SerializerContext(case=self)
            extra = serializer.as_werkzeug(context, self.body)
        else:
            extra = {}
        return {
            "method": self.method,
            "path": self.operation.schema.get_full_path(self.formatted_path),
            # Convert to a regular dictionary, as we use `CaseInsensitiveDict` which is not supported by Werkzeug
            "headers": dict(final_headers),
            "query_string": self.query,
            **extra,
        }

    def call_wsgi(
        self,
        app: Any = None,
        headers: Optional[Dict[str, str]] = None,
        query_string: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> WSGIResponse:
        application = app or self.app
        if application is None:
            raise RuntimeError(
                "WSGI application instance is required. "
                "Please, set `app` argument in the schema constructor or pass it to `call_wsgi`"
            )
        hook_context = HookContext(operation=self.operation)
        dispatch("before_call", hook_context, self)
        data = self.as_werkzeug_kwargs(headers)
        if query_string is not None:
            _merge_dict_to(data, "query_string", query_string)
        client = werkzeug.Client(application, WSGIResponse)
        with cookie_handler(client, self.cookies), self.operation.schema.ratelimit():
            response = client.open(**data, **kwargs)
        requests_kwargs = self.as_requests_kwargs(base_url=self.get_full_base_url(), headers=headers)
        response.request = requests.Request(**requests_kwargs).prepare()
        dispatch("after_call", hook_context, self, response)
        return response

    def call_asgi(
        self,
        app: Any = None,
        base_url: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> requests.Response:
        application = app or self.app
        if application is None:
            raise RuntimeError(
                "ASGI application instance is required. "
                "Please, set `app` argument in the schema constructor or pass it to `call_asgi`"
            )
        if base_url is None:
            base_url = self.get_full_base_url()
        client = ASGIClient(application)

        return self.call(base_url=base_url, session=client, headers=headers, **kwargs)

    def validate_response(
        self,
        response: GenericResponse,
        checks: Tuple["CheckFunction", ...] = (),
        additional_checks: Tuple["CheckFunction", ...] = (),
        excluded_checks: Tuple["CheckFunction", ...] = (),
        code_sample_style: Optional[str] = None,
    ) -> None:
        """Validate application response.

        By default, all available checks will be applied.

        :param response: Application response.
        :param checks: A tuple of check functions that accept ``response`` and ``case``.
        :param additional_checks: A tuple of additional checks that will be executed after ones from the ``checks``
            argument.
        :param code_sample_style: Controls the style of code samples for failure reproduction.
        """
        __tracebackhide__ = True
        from .checks import ALL_CHECKS

        checks = checks or ALL_CHECKS
        checks = tuple(check for check in checks if check not in excluded_checks)
        additional_checks = tuple(check for check in additional_checks if check not in excluded_checks)
        failed_checks = []
        for check in chain(checks, additional_checks):
            copied_case = self.partial_deepcopy()
            copied_response = copy_response(response)
            try:
                check(copied_response, copied_case)
            except AssertionError as exc:
                maybe_set_assertion_message(exc, check.__name__)
                failed_checks.append(exc)
        failed_checks = list(deduplicate_failed_checks(failed_checks))
        if failed_checks:
            exception_cls = get_grouped_exception(self.operation.verbose_name, *failed_checks)
            formatted_failures = "\n\n".join(f"{idx}. {error.args[0]}" for idx, error in enumerate(failed_checks, 1))
            code_sample_style = (
                CodeSampleStyle.from_str(code_sample_style)
                if code_sample_style is not None
                else self.operation.schema.code_sample_style
            )
            code_message = self._get_code_message(code_sample_style, response.request)
            payload = get_response_payload(response)
            raise exception_cls(
                f"\n\n{formatted_failures}\n\n"
                f"----------\n\n"
                f"Response status: {response.status_code}\n"
                f"Response payload: `{payload}`\n\n"
                f"{code_message}",
                causes=tuple(failed_checks),
            )

    def _get_code_message(self, code_sample_style: CodeSampleStyle, request: requests.PreparedRequest) -> str:
        if code_sample_style == CodeSampleStyle.python:
            code = self.get_code_to_reproduce(request=request)
            return f"Run this Python code to reproduce this response: \n\n    {code}\n"
        if code_sample_style == CodeSampleStyle.curl:
            code = self.as_curl_command(headers=dict(request.headers))
            return f"Run this cURL command to reproduce this response: \n\n    {code}\n"
        raise ValueError(f"Unknown code sample style: {code_sample_style.name}")

    def call_and_validate(
        self,
        base_url: Optional[str] = None,
        session: Optional[requests.Session] = None,
        headers: Optional[Dict[str, Any]] = None,
        checks: Tuple["CheckFunction", ...] = (),
        code_sample_style: Optional[str] = None,
        **kwargs: Any,
    ) -> requests.Response:
        __tracebackhide__ = True
        response = self.call(base_url, session, headers, **kwargs)
        self.validate_response(response, checks, code_sample_style=code_sample_style)
        return response

    def get_full_url(self) -> str:
        """Make a full URL to the current API operation, including query parameters."""
        base_url = self.base_url or "http://localhost"
        kwargs = self.as_requests_kwargs(base_url)
        request = requests.Request(**kwargs)
        prepared = requests.Session().prepare_request(request)  # type: ignore
        return cast(str, prepared.url)

    def partial_deepcopy(self) -> "Case":
        return self.__class__(
            operation=self.operation.partial_deepcopy(),
            data_generation_method=self.data_generation_method,
            media_type=self.media_type,
            source=self.source if self.source is None else self.source.partial_deepcopy(),
            path_parameters=fast_deepcopy(self.path_parameters),
            headers=fast_deepcopy(self.headers),
            cookies=fast_deepcopy(self.cookies),
            query=fast_deepcopy(self.query),
            body=fast_deepcopy(self.body),
        )


def _merge_dict_to(data: Dict[str, Any], data_key: str, new: Dict[str, Any]) -> None:
    original = data[data_key] or {}
    for key, value in new.items():
        original[key] = value
    data[data_key] = original


def validate_vanilla_requests_kwargs(data: Dict[str, Any]) -> None:
    """Check arguments for `requests.Session.request`.

    Some arguments can be valid for cases like ASGI integration, but at the same time they won't work for the regular
    `requests` calls. In such cases we need to avoid an obscure error message, that comes from `requests`.
    """
    url = data["url"]
    if not urlparse(url).netloc:
        raise RuntimeError(
            "The URL should be absolute, so Schemathesis knows where to send the data. \n"
            f"If you use the ASGI integration, please supply your test client "
            f"as the `session` argument to `call`.\nURL: {url}"
        )


def _escape_single_quotes(url: str) -> str:
    """Escape single quotes in a string, so it is usable as in generated Python code.

    The usual ``str.replace`` is not suitable as it may convert already escaped quotes to not-escaped.
    """
    result = []
    escape = False
    for char in url:
        if escape:
            result.append(char)
            escape = False
        elif char == "\\":
            result.append(char)
            escape = True
        elif char == "'":
            result.append("\\'")
        else:
            result.append(char)
    return "".join(result)


@contextmanager
def cookie_handler(client: werkzeug.Client, cookies: Optional[Cookies]) -> Generator[None, None, None]:
    """Set cookies required for a call."""
    if not cookies:
        yield
    else:
        for key, value in cookies.items():
            client.set_cookie("localhost", key, value)
        yield
        for key in cookies:
            client.delete_cookie("localhost", key)


P = TypeVar("P", bound=Parameter)
D = TypeVar("D", bound=dict)


@dataclass
class OperationDefinition(Generic[P, D]):
    """A wrapper to store not resolved API operation definitions.

    To prevent recursion errors we need to store definitions without resolving references. But operation definitions
    itself can be behind a reference (when there is a ``$ref`` in ``paths`` values), therefore we need to store this
    scope change to have a proper reference resolving later.
    """

    raw: D
    resolved: D
    scope: str
    parameters: Sequence[P]

    def __contains__(self, item: Union[str, int]) -> bool:
        return item in self.resolved

    def __getitem__(self, item: Union[str, int]) -> Union[None, bool, float, str, list, Dict[str, Any]]:
        return self.resolved[item]

    def get(self, item: Union[str, int], default: Any = None) -> Union[None, bool, float, str, list, Dict[str, Any]]:
        return self.resolved.get(item, default)


C = TypeVar("C", bound=Case)


[docs]@dataclass(eq=False) class APIOperation(Generic[P, C]): """A single operation defined in an API. You can get one via a ``schema`` instance. .. code-block:: python # Get the POST /items operation operation = schema["/items"]["POST"] """ # `path` does not contain `basePath` # Example <scheme>://<host>/<basePath>/users - "/users" is path # https://swagger.io/docs/specification/2-0/api-host-and-base-path/ path: str method: str definition: OperationDefinition = field(repr=False) schema: "BaseSchema" verbose_name: str = None # type: ignore app: Any = None base_url: Optional[str] = None path_parameters: ParameterSet[P] = field(default_factory=ParameterSet) headers: ParameterSet[P] = field(default_factory=ParameterSet) cookies: ParameterSet[P] = field(default_factory=ParameterSet) query: ParameterSet[P] = field(default_factory=ParameterSet) body: PayloadAlternatives[P] = field(default_factory=PayloadAlternatives) case_cls: Type[C] = Case # type: ignore def __post_init__(self) -> None: if self.verbose_name is None: self.verbose_name = f"{self.method.upper()} {self.full_path}" # type: ignore @property def full_path(self) -> str: return self.schema.get_full_path(self.path) @property def links(self) -> Dict[str, Dict[str, Any]]: return self.schema.get_links(self) def iter_parameters(self) -> Iterator[P]: """Iterate over all operation's parameters.""" return chain(self.path_parameters, self.headers, self.cookies, self.query) def _lookup_container(self, location: str) -> Union[ParameterSet[P], PayloadAlternatives[P], None]: return { "path": self.path_parameters, "header": self.headers, "cookie": self.cookies, "query": self.query, "body": self.body, }.get(location) def add_parameter(self, parameter: P) -> None: """Add a new processed parameter to an API operation. :param parameter: A parameter that will be used with this operation. :rtype: None """ # If the parameter has a typo, then by default, there will be an error from `jsonschema` earlier. # But if the user wants to skip schema validation, we choose to ignore a malformed parameter. # In this case, we still might generate some tests for an API operation, but without this parameter, # which is better than skip the whole operation from testing. container = self._lookup_container(parameter.location) if container is not None: container.add(parameter) def get_parameter(self, name: str, location: str) -> Optional[P]: container = self._lookup_container(location) if container is not None: return container.get(name) return None
[docs] def as_strategy( self, hooks: Optional["HookDispatcher"] = None, auth_storage: Optional[AuthStorage] = None, data_generation_method: DataGenerationMethod = DataGenerationMethod.default(), **kwargs: Any, ) -> st.SearchStrategy: """Turn this API operation into a Hypothesis strategy.""" strategy = self.schema.get_case_strategy(self, hooks, auth_storage, data_generation_method, **kwargs) def _apply_hooks(dispatcher: HookDispatcher, _strategy: st.SearchStrategy[Case]) -> st.SearchStrategy[Case]: for hook in dispatcher.get_all_by_name("before_generate_case"): _strategy = hook(HookContext(self), _strategy) return _strategy strategy = _apply_hooks(GLOBAL_HOOK_DISPATCHER, strategy) strategy = _apply_hooks(self.schema.hooks, strategy) if hooks is not None: strategy = _apply_hooks(hooks, strategy) return strategy
def get_security_requirements(self) -> List[str]: return self.schema.get_security_requirements(self) def get_strategies_from_examples(self) -> List[st.SearchStrategy[Case]]: """Get examples from the API operation.""" return self.schema.get_strategies_from_examples(self) def get_stateful_tests(self, response: GenericResponse, stateful: Optional["Stateful"]) -> Sequence["StatefulTest"]: return self.schema.get_stateful_tests(response, self, stateful) def get_parameter_serializer(self, location: str) -> Optional[Callable]: """Get a function that serializes parameters for the given location. It handles serializing data into various `collectionFormat` options and similar. Note that payload is handled by this function - it is handled by serializers. """ return self.schema.get_parameter_serializer(self, location) def prepare_multipart(self, form_data: FormData) -> Tuple[Optional[List], Optional[Dict[str, Any]]]: return self.schema.prepare_multipart(form_data, self) def get_request_payload_content_types(self) -> List[str]: return self.schema.get_request_payload_content_types(self) def partial_deepcopy(self) -> "APIOperation": return self.__class__( path=self.path, # string, immutable method=self.method, # string, immutable definition=fast_deepcopy(self.definition), schema=self.schema.clone(), # shallow copy verbose_name=self.verbose_name, # string, immutable app=self.app, # not deepcopyable base_url=self.base_url, # string, immutable path_parameters=fast_deepcopy(self.path_parameters), headers=fast_deepcopy(self.headers), cookies=fast_deepcopy(self.cookies), query=fast_deepcopy(self.query), body=fast_deepcopy(self.body), ) def clone(self, **components: Any) -> "APIOperation": """Create a new instance of this API operation with updated components.""" return self.__class__( path=self.path, method=self.method, verbose_name=self.verbose_name, definition=self.definition, schema=self.schema, app=self.app, base_url=self.base_url, path_parameters=components["path_parameters"], query=components["query"], headers=components["headers"], cookies=components["cookies"], body=components["body"], )
[docs] def make_case( self, *, path_parameters: Optional[PathParameters] = None, headers: Optional[Headers] = None, cookies: Optional[Cookies] = None, query: Optional[Query] = None, body: Union[Body, NotSet] = NOT_SET, media_type: Optional[str] = None, ) -> C: """Create a new example for this API operation. The main use case is constructing Case instances completely manually, without data generation. """ return self.schema.make_case( case_cls=self.case_cls, operation=self, path_parameters=path_parameters, headers=headers, cookies=cookies, query=query, body=body, media_type=media_type, )
@property def operation_reference(self) -> str: path = self.path.replace("~", "~0").replace("/", "~1") return f"#/paths/{path}/{self.method}"
[docs] def validate_response(self, response: GenericResponse) -> None: """Validate API response for conformance. :raises CheckFailed: If the response does not conform to the API schema. """ return self.schema.validate_response(self, response)
[docs] def is_response_valid(self, response: GenericResponse) -> bool: """Validate API response for conformance.""" try: self.validate_response(response) return True except CheckFailed: return False
def get_raw_payload_schema(self, media_type: str) -> Optional[Dict[str, Any]]: return self.schema._get_payload_schema(self.definition.raw, media_type) def get_resolved_payload_schema(self, media_type: str) -> Optional[Dict[str, Any]]: return self.schema._get_payload_schema(self.definition.resolved, media_type)
# backward-compatibility Endpoint = APIOperation class Status(str, Enum): """Status of an action or multiple actions.""" success = "success" failure = "failure" error = "error" skip = "skip" @dataclass(repr=False) class Check: """Single check run result.""" name: str value: Status response: Optional[GenericResponse] elapsed: float example: Case message: Optional[str] = None # Failure-specific context context: Optional[FailureContext] = None request: Optional[requests.PreparedRequest] = None @dataclass(repr=False) class Request: """Request data extracted from `Case`.""" method: str uri: str body: Optional[str] headers: Headers @classmethod def from_case(cls, case: Case, session: requests.Session) -> "Request": """Create a new `Request` instance from `Case`.""" base_url = case.get_full_base_url() kwargs = case.as_requests_kwargs(base_url) request = requests.Request(**kwargs) prepared = session.prepare_request(request) # type: ignore return cls.from_prepared_request(prepared) @classmethod def from_prepared_request(cls, prepared: requests.PreparedRequest) -> "Request": """A prepared request version is already stored in `requests.Response`.""" body = prepared.body if isinstance(body, str): # can be a string for `application/x-www-form-urlencoded` body = body.encode("utf-8") # these values have `str` type at this point uri = cast(str, prepared.url) method = cast(str, prepared.method) return cls( uri=uri, method=method, headers={key: [value] for (key, value) in prepared.headers.items()}, body=serialize_payload(body) if body is not None else body, ) def serialize_payload(payload: bytes) -> str: return base64.b64encode(payload).decode() @dataclass(repr=False) class Response: """Unified response data.""" status_code: int message: str headers: Dict[str, List[str]] body: Optional[str] encoding: Optional[str] http_version: str elapsed: float @classmethod def from_requests(cls, response: requests.Response) -> "Response": """Create a response from requests.Response.""" headers = {name: response.raw.headers.getlist(name) for name in response.raw.headers.keys()} # Similar to http.client:319 (HTTP version detection in stdlib's `http` package) http_version = "1.0" if response.raw.version == 10 else "1.1" def is_empty(_response: requests.Response) -> bool: # Assume the response is empty if: # - no `Content-Length` header # - no chunks when iterating over its content return "Content-Length" not in headers and list(_response.iter_content()) == [] body = None if is_empty(response) else serialize_payload(response.content) return cls( status_code=response.status_code, message=response.reason, body=body, encoding=response.encoding, headers=headers, http_version=http_version, elapsed=response.elapsed.total_seconds(), ) @classmethod def from_wsgi(cls, response: WSGIResponse, elapsed: float) -> "Response": """Create a response from WSGI response.""" message = http.client.responses.get(response.status_code, "UNKNOWN") headers = {name: response.headers.getlist(name) for name in response.headers.keys()} # Note, this call ensures that `response.response` is a sequence, which is needed for comparison data = response.get_data() body = None if response.response == [] else serialize_payload(data) encoding: Optional[str] if body is not None: encoding = response.mimetype_params.get("charset", response.charset) else: encoding = None return cls( status_code=response.status_code, message=message, body=body, encoding=encoding, headers=headers, http_version="1.1", elapsed=elapsed, ) @dataclass class Interaction: """A single interaction with the target app.""" request: Request response: Response checks: List[Check] status: Status data_generation_method: DataGenerationMethod recorded_at: str = field(default_factory=lambda: datetime.datetime.now().isoformat()) @classmethod def from_requests( cls, case: Case, response: requests.Response, status: Status, checks: List[Check] ) -> "Interaction": return cls( request=Request.from_prepared_request(response.request), response=Response.from_requests(response), status=status, checks=checks, data_generation_method=cast(DataGenerationMethod, case.data_generation_method), ) @classmethod def from_wsgi( cls, case: Case, response: WSGIResponse, headers: Dict[str, Any], elapsed: float, status: Status, checks: List[Check], ) -> "Interaction": session = requests.Session() session.headers.update(headers) return cls( request=Request.from_case(case, session), response=Response.from_wsgi(response, elapsed), status=status, checks=checks, data_generation_method=cast(DataGenerationMethod, case.data_generation_method), ) @dataclass(repr=False) class TestResult: """Result of a single test.""" __test__ = False method: str path: str verbose_name: str data_generation_method: List[DataGenerationMethod] checks: List[Check] = field(default_factory=list) errors: List[Tuple[Exception, Optional[Case]]] = field(default_factory=list) interactions: List[Interaction] = field(default_factory=list) logs: List[LogRecord] = field(default_factory=list) is_errored: bool = False is_flaky: bool = False is_skipped: bool = False is_executed: bool = False seed: Optional[int] = None # To show a proper reproduction code if an error happens and there is no way to get actual headers that were # sent over the network. Or there could be no actual requests at all overridden_headers: Optional[Dict[str, Any]] = None def mark_errored(self) -> None: self.is_errored = True def mark_flaky(self) -> None: self.is_flaky = True def mark_skipped(self) -> None: self.is_skipped = True def mark_executed(self) -> None: self.is_executed = True @property def has_errors(self) -> bool: return bool(self.errors) @property def has_failures(self) -> bool: return any(check.value == Status.failure for check in self.checks) @property def has_logs(self) -> bool: return bool(self.logs) def add_success(self, name: str, example: Case, response: GenericResponse, elapsed: float) -> Check: check = Check( name=name, value=Status.success, response=response, elapsed=elapsed, example=example, request=None ) self.checks.append(check) return check def add_failure( self, name: str, example: Case, response: Optional[GenericResponse], elapsed: float, message: str, context: Optional[FailureContext], request: Optional[requests.PreparedRequest] = None, ) -> Check: check = Check( name=name, value=Status.failure, response=response, elapsed=elapsed, example=example, message=message, context=context, request=request, ) self.checks.append(check) return check def add_error(self, exception: Exception, example: Optional[Case] = None) -> None: self.errors.append((exception, example)) def store_requests_response( self, case: Case, response: requests.Response, status: Status, checks: List[Check] ) -> None: self.interactions.append(Interaction.from_requests(case, response, status, checks)) def store_wsgi_response( self, case: Case, response: WSGIResponse, headers: Dict[str, Any], elapsed: float, status: Status, checks: List[Check], ) -> None: self.interactions.append(Interaction.from_wsgi(case, response, headers, elapsed, status, checks)) @dataclass(repr=False) class TestResultSet: """Set of multiple test results.""" __test__ = False results: List[TestResult] = field(default_factory=list) generic_errors: List[InvalidSchema] = field(default_factory=list) warnings: List[str] = field(default_factory=list) def __iter__(self) -> Iterator[TestResult]: return iter(self.results) @property def is_empty(self) -> bool: """If the result set contains no results.""" return len(self.results) == 0 and len(self.generic_errors) == 0 @property def has_failures(self) -> bool: """If any result has any failures.""" return any(result.has_failures for result in self) @property def has_errors(self) -> bool: """If any result has any errors.""" return self.errored_count > 0 @property def has_logs(self) -> bool: """If any result has any captured logs.""" return any(result.has_logs for result in self) def _count(self, predicate: Callable) -> int: return sum(1 for result in self if predicate(result)) @property def passed_count(self) -> int: return self._count(lambda result: not result.has_errors and not result.is_skipped and not result.has_failures) @property def skipped_count(self) -> int: return self._count(lambda result: result.is_skipped) @property def failed_count(self) -> int: return self._count(lambda result: result.has_failures and not result.is_errored) @property def errored_count(self) -> int: return self._count(lambda result: result.has_errors or result.is_errored) + len(self.generic_errors) @property def total(self) -> Dict[str, Dict[Union[str, Status], int]]: """An aggregated statistic about test results.""" output: Dict[str, Dict[Union[str, Status], int]] = {} for item in self.results: for check in item.checks: output.setdefault(check.name, Counter()) output[check.name][check.value] += 1 output[check.name]["total"] += 1 # Avoid using Counter, since its behavior could harm in other places: # `if not total["unknown"]:` - this will lead to the branch execution # It is better to let it fail if there is a wrong key return {key: dict(value) for key, value in output.items()} def append(self, item: TestResult) -> None: """Add a new item to the results list.""" self.results.append(item) def add_warning(self, warning: str) -> None: """Add a new warning to the warnings list.""" self.warnings.append(warning) CheckFunction = Callable[[GenericResponse, Case], Optional[bool]]