Source code for schemathesis.models

from __future__ import annotations

import datetime
import inspect
import textwrap
from collections import Counter
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from functools import lru_cache, partial
from itertools import chain
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generator,
    Generic,
    Iterator,
    Literal,
    NoReturn,
    Sequence,
    Type,
    TypeVar,
    cast,
)
from urllib.parse import quote, unquote, urljoin, urlsplit, urlunsplit

from . import serializers
from ._dependency_versions import IS_WERKZEUG_ABOVE_3
from ._override import CaseOverride
from .code_samples import CodeSampleStyle
from .constants import (
    NOT_SET,
    SCHEMATHESIS_TEST_CASE_HEADER,
    SERIALIZERS_SUGGESTION_MESSAGE,
    USER_AGENT,
)
from .exceptions import (
    CheckFailed,
    OperationSchemaError,
    SerializationNotPossible,
    SkipTest,
    UsageError,
    deduplicate_failed_checks,
    get_grouped_exception,
    maybe_set_assertion_message,
)
from .generation import DataGenerationMethod, GenerationConfig, generate_random_case_id
from .hooks import GLOBAL_HOOK_DISPATCHER, HookContext, HookDispatcher, dispatch
from .internal.checks import CheckContext
from .internal.copy import fast_deepcopy
from .internal.deprecation import deprecated_function, deprecated_property
from .internal.diff import diff
from .internal.output import prepare_response_payload
from .parameters import Parameter, ParameterSet, PayloadAlternatives
from .sanitization import sanitize_request, sanitize_response
from .transports import ASGITransport, RequestsTransport, WSGITransport, deserialize_payload, serialize_payload
from .types import Body, Cookies, FormData, Headers, NotSet, PathParameters, Query

if TYPE_CHECKING:
    import unittest
    from logging import LogRecord

    import requests.auth
    import werkzeug
    from hypothesis import strategies as st
    from requests.structures import CaseInsensitiveDict

    from .auths import AuthStorage
    from .failures import FailureContext
    from .internal.checks import CheckFunction
    from .schemas import BaseSchema
    from .serializers import Serializer
    from .stateful import Stateful, StatefulTest
    from .transports.responses import GenericResponse, WSGIResponse


@dataclass
class TransitionId:
    name: str
    status_code: str

    __slots__ = ("name", "status_code")


@dataclass
class CaseSource:
    """Data sources, used to generate a test case."""

    case: Case
    response: GenericResponse
    elapsed: float
    overrides_all_parameters: bool
    transition_id: TransitionId

    def partial_deepcopy(self) -> CaseSource:
        return self.__class__(
            case=self.case.partial_deepcopy(),
            response=self.response,
            elapsed=self.elapsed,
            overrides_all_parameters=self.overrides_all_parameters,
            transition_id=self.transition_id,
        )


def cant_serialize(media_type: str) -> NoReturn:  # type: ignore
    """Reject the current example if we don't know how to send this data to the application."""
    from hypothesis import event, note, reject

    event_text = f"Can't serialize data to `{media_type}`."
    note(f"{event_text} {SERIALIZERS_SUGGESTION_MESSAGE}")
    event(event_text)
    reject()  # type: ignore


@lru_cache
def get_request_signature() -> inspect.Signature:
    import requests

    return inspect.signature(requests.Request)


@dataclass()
class PreparedRequestData:
    method: str
    url: str
    body: str | bytes | None
    headers: Headers


def prepare_request_data(kwargs: dict[str, Any]) -> PreparedRequestData:
    """Prepare request data for generating code samples."""
    import requests

    kwargs = {key: value for key, value in kwargs.items() if key in get_request_signature().parameters}
    request = requests.Request(**kwargs).prepare()
    return PreparedRequestData(
        method=str(request.method), url=str(request.url), body=request.body, headers=dict(request.headers)
    )


class TestPhase(str, Enum):
    __test__ = False

    EXPLICIT = "explicit"
    COVERAGE = "coverage"
    GENERATE = "generate"


@dataclass
class GenerationMetadata:
    """Stores various information about how data is generated."""

    query: DataGenerationMethod | None
    path_parameters: DataGenerationMethod | None
    headers: DataGenerationMethod | None
    cookies: DataGenerationMethod | None
    body: DataGenerationMethod | None
    phase: TestPhase
    # Temporary attributes to carry info specific to the coverage phase
    description: str | None
    location: str | None
    parameter: str | None
    parameter_location: str | None

    __slots__ = (
        "query",
        "path_parameters",
        "headers",
        "cookies",
        "body",
        "phase",
        "description",
        "location",
        "parameter",
        "parameter_location",
    )


@dataclass(repr=False)
class Case:
    """A single test case parameters."""

    operation: APIOperation
    # Time spent on generation of this test case
    generation_time: float
    # Unique test case identifier
    id: str = field(default_factory=generate_random_case_id, compare=False)
    path_parameters: PathParameters | None = None
    headers: CaseInsensitiveDict | None = None
    cookies: Cookies | None = None
    query: Query | None = None
    # By default, there is no body, but we can't use `None` as the default value because it clashes with `null`
    # which is a valid payload.
    body: Body | NotSet = NOT_SET
    # The media type for cases with a payload. For example, "application/json"
    media_type: str | None = None
    source: CaseSource | None = None

    meta: GenerationMetadata | None = None

    # The way the case was generated (None for manually crafted ones)
    data_generation_method: DataGenerationMethod | None = None
    _auth: requests.auth.AuthBase | None = None
    _has_explicit_auth: bool = False
    _explicit_method: str | None = None

    def __post_init__(self) -> None:
        self._original_path_parameters = self.path_parameters.copy() if self.path_parameters else None
        self._original_headers = self.headers.copy() if self.headers else None
        self._original_cookies = self.cookies.copy() if self.cookies else None
        self._original_query = self.query.copy() if self.query else None

    def _has_generated_component(self, name: str) -> bool:
        assert name in ["path_parameters", "headers", "cookies", "query"]
        if self.meta is None:
            return False
        return getattr(self.meta, name) is not None

    def _get_diff(self, component: Literal["path_parameters", "headers", "query", "cookies"]) -> dict[str, Any]:
        original = getattr(self, f"_original_{component}")
        current = getattr(self, component)
        if not (current and original):
            return {}
        original_value = original if self._has_generated_component(component) else {}
        return diff(original_value, current)

    def __repr__(self) -> str:
        parts = [f"{self.__class__.__name__}("]
        first = True
        for name in ("path_parameters", "headers", "cookies", "query", "body"):
            value = getattr(self, name)
            if value is not None and not isinstance(value, NotSet):
                if first:
                    first = False
                else:
                    parts.append(", ")
                parts.extend((name, "=", repr(value)))
        return "".join(parts) + ")"

    def __hash__(self) -> int:
        return hash(self.as_curl_command({SCHEMATHESIS_TEST_CASE_HEADER: "0"}))

    @property
    def _override(self) -> CaseOverride:
        return CaseOverride(
            path_parameters=self._get_diff("path_parameters"),
            headers=self._get_diff("headers"),
            query=self._get_diff("query"),
            cookies=self._get_diff("cookies"),
        )

    def _repr_pretty_(self, *args: Any, **kwargs: Any) -> None: ...

    @deprecated_property(removed_in="4.0", replacement="`operation`")
    def endpoint(self) -> APIOperation:
        return self.operation

    @property
    def path(self) -> str:
        return self.operation.path

    @property
    def full_path(self) -> str:
        return self.operation.full_path

    @property
    def method(self) -> str:
        return self._explicit_method.upper() if self._explicit_method else self.operation.method.upper()

    @property
    def base_url(self) -> str | None:
        return self.operation.base_url

    @property
    def app(self) -> Any:
        return self.operation.app

    def set_source(
        self,
        response: GenericResponse,
        case: Case,
        elapsed: float,
        overrides_all_parameters: bool,
        transition_id: TransitionId,
    ) -> None:
        self.source = CaseSource(
            case=case,
            response=response,
            elapsed=elapsed,
            overrides_all_parameters=overrides_all_parameters,
            transition_id=transition_id,
        )

    @property
    def formatted_path(self) -> str:
        try:
            return self.path.format(**self.path_parameters or {})
        except KeyError as exc:
            # This may happen when a path template has a placeholder for variable "X", but parameter "X" is not defined
            # in the parameters list.
            # When `exc` is formatted, it is the missing key name in quotes. E.g. 'id'
            raise OperationSchemaError(f"Path parameter {exc} is not defined") from exc
        except (IndexError, ValueError) as exc:
            # A single unmatched `}` inside the path template may cause this
            raise OperationSchemaError(f"Malformed path template: `{self.path}`\n\n  {exc}") from exc

    def get_full_base_url(self) -> str | None:
        """Create a full base url, adding "localhost" for WSGI apps."""
        parts = urlsplit(self.base_url)
        if not parts.hostname:
            path = cast(str, parts.path or "")
            return urlunsplit(("http", "localhost", path or "", "", ""))
        return self.base_url

    def prepare_code_sample_data(self, headers: dict[str, Any] | None) -> PreparedRequestData:
        base_url = self.get_full_base_url()
        kwargs = RequestsTransport().serialize_case(self, base_url=base_url, headers=headers)
        return prepare_request_data(kwargs)

    def get_code_to_reproduce(
        self,
        headers: dict[str, Any] | None = None,
        request: requests.PreparedRequest | None = None,
        verify: bool = True,
    ) -> str:
        """Construct a Python code to reproduce this case with `requests`."""
        if request is not None:
            request_data = prepare_request_data(
                {
                    "method": request.method,
                    "url": request.url,
                    "headers": request.headers,
                    "data": request.body,
                }
            )
        else:
            request_data = self.prepare_code_sample_data(headers)
        return CodeSampleStyle.python.generate(
            method=request_data.method,
            url=request_data.url,
            body=request_data.body,
            headers=dict(self.headers) if self.headers is not None else None,
            verify=verify,
            extra_headers=request_data.headers,
        )

    def as_curl_command(self, headers: dict[str, Any] | None = None, verify: bool = True) -> str:
        """Construct a curl command for a given case."""
        request_data = self.prepare_code_sample_data(headers)
        return CodeSampleStyle.curl.generate(
            method=request_data.method,
            url=request_data.url,
            body=request_data.body,
            headers=dict(self.headers) if self.headers is not None else None,
            verify=verify,
            extra_headers=request_data.headers,
        )

    def _get_base_url(self, base_url: str | None = None) -> str:
        if base_url is None:
            if self.base_url is not None:
                base_url = self.base_url
            else:
                raise ValueError(
                    "Base URL is required as `base_url` argument in `call` or should be specified "
                    "in the schema constructor as a part of Schema URL."
                )
        return base_url

    def _get_headers(self, headers: dict[str, str] | None = None) -> CaseInsensitiveDict:
        from requests.structures import CaseInsensitiveDict

        final_headers = self.headers.copy() if self.headers is not None else CaseInsensitiveDict()
        if headers:
            final_headers.update(headers)
        final_headers.setdefault("User-Agent", USER_AGENT)
        final_headers.setdefault(SCHEMATHESIS_TEST_CASE_HEADER, self.id)
        return final_headers

    def _get_serializer(self, media_type: str | None = None) -> Serializer | None:
        """Get a serializer for the payload, if there is any."""
        input_media_type = media_type or self.media_type
        if input_media_type is not None:
            media_type = serializers.get_first_matching_media_type(input_media_type)
            if media_type is None:
                # This media type is set manually. Otherwise, it should have been rejected during the data generation
                raise SerializationNotPossible.for_media_type(input_media_type)
            # SAFETY: It is safe to assume that serializer will be found, because `media_type` returned above
            # is registered. This intentionally ignores cases with concurrent serializers registry modification.
            cls = cast(Type[serializers.Serializer], serializers.get(media_type))
            return cls()
        return None

    def _get_body(self) -> Body | NotSet:
        return self.body

    @deprecated_function(removed_in="4.0", replacement="Case.as_transport_kwargs")
    def as_requests_kwargs(self, base_url: str | None = None, headers: dict[str, str] | None = None) -> dict[str, Any]:
        """Convert the case into a dictionary acceptable by requests."""
        return RequestsTransport().serialize_case(self, base_url=base_url, headers=headers)

    def as_transport_kwargs(self, base_url: str | None = None, headers: dict[str, str] | None = None) -> dict[str, Any]:
        """Convert the test case into a dictionary acceptable by the underlying transport call."""
        return self.operation.schema.transport.serialize_case(self, base_url=base_url, headers=headers)

    def call(
        self,
        base_url: str | None = None,
        session: requests.Session | None = None,
        headers: dict[str, Any] | None = None,
        params: dict[str, Any] | None = None,
        cookies: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> GenericResponse:
        hook_context = HookContext(operation=self.operation)
        dispatch("before_call", hook_context, self)
        response = self.operation.schema.transport.send(
            self, session=session, base_url=base_url, headers=headers, params=params, cookies=cookies, **kwargs
        )
        dispatch("after_call", hook_context, self, response)
        return response

    @deprecated_function(removed_in="4.0", replacement="Case.as_transport_kwargs")
    def as_werkzeug_kwargs(self, headers: dict[str, str] | None = None) -> dict[str, Any]:
        """Convert the case into a dictionary acceptable by werkzeug.Client."""
        return WSGITransport(self.app).serialize_case(self, headers=headers)

    @deprecated_function(removed_in="4.0", replacement="Case.call")
    def call_wsgi(
        self,
        app: Any = None,
        headers: dict[str, str] | None = None,
        query_string: dict[str, str] | None = None,
        **kwargs: Any,
    ) -> WSGIResponse:
        application = app or self.app
        if application is None:
            raise RuntimeError(
                "WSGI application instance is required. "
                "Please, set `app` argument in the schema constructor or pass it to `call_wsgi`"
            )
        hook_context = HookContext(operation=self.operation)
        dispatch("before_call", hook_context, self)
        response = WSGITransport(application).send(self, headers=headers, params=query_string, **kwargs)
        dispatch("after_call", hook_context, self, response)
        return response

    @deprecated_function(removed_in="4.0", replacement="Case.call")
    def call_asgi(
        self,
        app: Any = None,
        base_url: str | None = None,
        headers: dict[str, str] | None = None,
        **kwargs: Any,
    ) -> requests.Response:
        application = app or self.app
        if application is None:
            raise RuntimeError(
                "ASGI application instance is required. "
                "Please, set `app` argument in the schema constructor or pass it to `call_asgi`"
            )
        hook_context = HookContext(operation=self.operation)
        dispatch("before_call", hook_context, self)
        response = ASGITransport(application).send(self, base_url=base_url, headers=headers, **kwargs)
        dispatch("after_call", hook_context, self, response)
        return response

    def validate_response(
        self,
        response: GenericResponse,
        checks: tuple[CheckFunction, ...] = (),
        additional_checks: tuple[CheckFunction, ...] = (),
        excluded_checks: tuple[CheckFunction, ...] = (),
        code_sample_style: str | None = None,
        headers: dict[str, Any] | None = None,
    ) -> None:
        """Validate application response.

        By default, all available checks will be applied.

        :param response: Application response.
        :param checks: A tuple of check functions that accept ``response`` and ``case``.
        :param additional_checks: A tuple of additional checks that will be executed after ones from the ``checks``
            argument.
        :param excluded_checks: Checks excluded from the default ones.
        :param code_sample_style: Controls the style of code samples for failure reproduction.
        """
        __tracebackhide__ = True
        from requests.structures import CaseInsensitiveDict

        from .checks import ALL_CHECKS
        from .internal.checks import wrap_check
        from .transports.responses import get_payload, get_reason

        if checks:
            _checks = tuple(wrap_check(check) for check in checks)
        else:
            _checks = checks
        if additional_checks:
            _additional_checks = tuple(wrap_check(check) for check in additional_checks)
        else:
            _additional_checks = additional_checks

        checks = _checks or ALL_CHECKS
        checks = tuple(check for check in checks if check not in excluded_checks)
        additional_checks = tuple(check for check in _additional_checks if check not in excluded_checks)
        failed_checks = []
        ctx = CheckContext(
            override=self._override, auth=None, headers=CaseInsensitiveDict(headers) if headers else None
        )
        for check in chain(checks, additional_checks):
            copied_case = self.partial_deepcopy()
            try:
                check(ctx, response, copied_case)
            except AssertionError as exc:
                maybe_set_assertion_message(exc, check.__name__)
                failed_checks.append(exc)
        failed_checks = list(deduplicate_failed_checks(failed_checks))
        if failed_checks:
            exception_cls = get_grouped_exception(self.operation.verbose_name, *failed_checks)
            formatted = ""
            for idx, failed in enumerate(failed_checks, 1):
                if isinstance(failed, CheckFailed) and failed.context is not None:
                    title = failed.context.title
                    if failed.context.message:
                        message = failed.context.message
                    else:
                        message = None
                else:
                    title, message = failed.args
                formatted += "\n\n"
                formatted += f"{idx}. {title}"
                if message is not None:
                    formatted += "\n\n"
                    formatted += textwrap.indent(message, prefix="    ")

            status_code = response.status_code
            reason = get_reason(status_code)
            formatted += f"\n\n[{response.status_code}] {reason}:"
            payload = get_payload(response)
            if not payload:
                formatted += "\n\n    <EMPTY>"
            else:
                payload = prepare_response_payload(payload, config=self.operation.schema.output_config)
                payload = textwrap.indent(f"\n`{payload}`", prefix="    ")
                formatted += f"\n{payload}"
            code_sample_style = (
                CodeSampleStyle.from_str(code_sample_style)
                if code_sample_style is not None
                else self.operation.schema.code_sample_style
            )
            verify = getattr(response, "verify", True)
            if self.operation.schema.sanitize_output:
                sanitize_request(response.request)
                sanitize_response(response)
            code_message = self._get_code_message(code_sample_style, response.request, verify=verify)
            raise exception_cls(
                f"{formatted}\n\n" f"{code_message}",
                causes=tuple(failed_checks),
            )

    def _get_code_message(
        self, code_sample_style: CodeSampleStyle, request: requests.PreparedRequest, verify: bool
    ) -> str:
        if code_sample_style == CodeSampleStyle.python:
            code = self.get_code_to_reproduce(request=request, verify=verify)
        elif code_sample_style == CodeSampleStyle.curl:
            code = self.as_curl_command(headers=dict(request.headers), verify=verify)
        else:
            raise ValueError(f"Unknown code sample style: {code_sample_style.name}")
        return f"Reproduce with: \n\n    {code}\n"

    def call_and_validate(
        self,
        base_url: str | None = None,
        session: requests.Session | None = None,
        headers: dict[str, Any] | None = None,
        checks: tuple[CheckFunction, ...] = (),
        additional_checks: tuple[CheckFunction, ...] = (),
        excluded_checks: tuple[CheckFunction, ...] = (),
        code_sample_style: str | None = None,
        **kwargs: Any,
    ) -> requests.Response:
        __tracebackhide__ = True
        response = self.call(base_url, session, headers, **kwargs)
        self.validate_response(
            response,
            checks,
            code_sample_style=code_sample_style,
            headers=headers,
            additional_checks=additional_checks,
            excluded_checks=excluded_checks,
        )
        return response

    def _get_url(self, base_url: str | None) -> str:
        base_url = self._get_base_url(base_url)
        formatted_path = self.formatted_path.lstrip("/")
        if not base_url.endswith("/"):
            base_url += "/"
        return unquote(urljoin(base_url, quote(formatted_path)))

    def get_full_url(self) -> str:
        """Make a full URL to the current API operation, including query parameters."""
        import requests

        base_url = self.base_url or "http://127.0.0.1"
        kwargs = RequestsTransport().serialize_case(self, base_url=base_url)
        request = requests.Request(**kwargs)
        prepared = requests.Session().prepare_request(request)  # type: ignore
        return cast(str, prepared.url)

    def partial_deepcopy(self) -> Case:
        return self.__class__(
            operation=self.operation.partial_deepcopy(),
            data_generation_method=self.data_generation_method,
            media_type=self.media_type,
            source=self.source if self.source is None else self.source.partial_deepcopy(),
            path_parameters=fast_deepcopy(self.path_parameters),
            headers=fast_deepcopy(self.headers),
            cookies=fast_deepcopy(self.cookies),
            query=fast_deepcopy(self.query),
            body=fast_deepcopy(self.body),
            meta=self.meta,
            generation_time=self.generation_time,
            id=self.id,
            _auth=self._auth,
            _has_explicit_auth=self._has_explicit_auth,
        )


@contextmanager
def cookie_handler(client: werkzeug.Client, cookies: Cookies | None) -> Generator[None, None, None]:
    """Set cookies required for a call."""
    if not cookies:
        yield
    else:
        for key, value in cookies.items():
            if IS_WERKZEUG_ABOVE_3:
                client.set_cookie(key=key, value=value, domain="localhost")
            else:
                client.set_cookie("localhost", key=key, value=value)
        yield
        for key in cookies:
            if IS_WERKZEUG_ABOVE_3:
                client.delete_cookie(key=key, domain="localhost")
            else:
                client.delete_cookie("localhost", key=key)


P = TypeVar("P", bound=Parameter)
D = TypeVar("D", bound=dict)


@dataclass
class OperationDefinition(Generic[D]):
    """A wrapper to store not resolved API operation definitions.

    To prevent recursion errors we need to store definitions without resolving references. But operation definitions
    itself can be behind a reference (when there is a ``$ref`` in ``paths`` values), therefore we need to store this
    scope change to have a proper reference resolving later.
    """

    raw: D
    resolved: D
    scope: str

    __slots__ = ("raw", "resolved", "scope")

    def _repr_pretty_(self, *args: Any, **kwargs: Any) -> None: ...


C = TypeVar("C", bound=Case)


[docs]@dataclass(eq=False) class APIOperation(Generic[P, C]): """A single operation defined in an API. You can get one via a ``schema`` instance. .. code-block:: python # Get the POST /items operation operation = schema["/items"]["POST"] """ # `path` does not contain `basePath` # Example <scheme>://<host>/<basePath>/users - "/users" is path # https://swagger.io/docs/specification/2-0/api-host-and-base-path/ path: str method: str definition: OperationDefinition = field(repr=False) schema: BaseSchema verbose_name: str = None # type: ignore app: Any = None base_url: str | None = None path_parameters: ParameterSet[P] = field(default_factory=ParameterSet) headers: ParameterSet[P] = field(default_factory=ParameterSet) cookies: ParameterSet[P] = field(default_factory=ParameterSet) query: ParameterSet[P] = field(default_factory=ParameterSet) body: PayloadAlternatives[P] = field(default_factory=PayloadAlternatives) case_cls: type[C] = Case # type: ignore def __post_init__(self) -> None: if self.verbose_name is None: self.verbose_name = f"{self.method.upper()} {self.full_path}" # type: ignore @property def full_path(self) -> str: return self.schema.get_full_path(self.path) @property def links(self) -> dict[str, dict[str, Any]]: return self.schema.get_links(self) @property def tags(self) -> list[str] | None: return self.schema.get_tags(self) def iter_parameters(self) -> Iterator[P]: """Iterate over all operation's parameters.""" return chain(self.path_parameters, self.headers, self.cookies, self.query) def _lookup_container(self, location: str) -> ParameterSet[P] | PayloadAlternatives[P] | None: return { "path": self.path_parameters, "header": self.headers, "cookie": self.cookies, "query": self.query, "body": self.body, }.get(location) def add_parameter(self, parameter: P) -> None: """Add a new processed parameter to an API operation. :param parameter: A parameter that will be used with this operation. :rtype: None """ # If the parameter has a typo, then by default, there will be an error from `jsonschema` earlier. # But if the user wants to skip schema validation, we choose to ignore a malformed parameter. # In this case, we still might generate some tests for an API operation, but without this parameter, # which is better than skip the whole operation from testing. container = self._lookup_container(parameter.location) if container is not None: container.add(parameter) def get_parameter(self, name: str, location: str) -> P | None: container = self._lookup_container(location) if container is not None: return container.get(name) return None
[docs] def as_strategy( self, hooks: HookDispatcher | None = None, auth_storage: AuthStorage | None = None, data_generation_method: DataGenerationMethod = DataGenerationMethod.default(), generation_config: GenerationConfig | None = None, **kwargs: Any, ) -> st.SearchStrategy: """Turn this API operation into a Hypothesis strategy.""" strategy = self.schema.get_case_strategy( self, hooks, auth_storage, data_generation_method, generation_config=generation_config, **kwargs ) def _apply_hooks(dispatcher: HookDispatcher, _strategy: st.SearchStrategy[Case]) -> st.SearchStrategy[Case]: context = HookContext(self) for hook in dispatcher.get_all_by_name("before_generate_case"): _strategy = hook(context, _strategy) for hook in dispatcher.get_all_by_name("filter_case"): hook = partial(hook, context) _strategy = _strategy.filter(hook) for hook in dispatcher.get_all_by_name("map_case"): hook = partial(hook, context) _strategy = _strategy.map(hook) for hook in dispatcher.get_all_by_name("flatmap_case"): hook = partial(hook, context) _strategy = _strategy.flatmap(hook) return _strategy strategy = _apply_hooks(GLOBAL_HOOK_DISPATCHER, strategy) strategy = _apply_hooks(self.schema.hooks, strategy) if hooks is not None: strategy = _apply_hooks(hooks, strategy) return strategy
def get_security_requirements(self) -> list[str]: return self.schema.get_security_requirements(self) def get_strategies_from_examples( self, as_strategy_kwargs: dict[str, Any] | None = None ) -> list[st.SearchStrategy[Case]]: """Get examples from the API operation.""" return self.schema.get_strategies_from_examples(self, as_strategy_kwargs=as_strategy_kwargs) def get_stateful_tests(self, response: GenericResponse, stateful: Stateful | None) -> Sequence[StatefulTest]: return self.schema.get_stateful_tests(response, self, stateful) def get_parameter_serializer(self, location: str) -> Callable | None: """Get a function that serializes parameters for the given location. It handles serializing data into various `collectionFormat` options and similar. Note that payload is handled by this function - it is handled by serializers. """ return self.schema.get_parameter_serializer(self, location) def prepare_multipart(self, form_data: FormData) -> tuple[list | None, dict[str, Any] | None]: return self.schema.prepare_multipart(form_data, self) def get_request_payload_content_types(self) -> list[str]: return self.schema.get_request_payload_content_types(self) def _get_default_media_type(self) -> str: # If the user wants to send payload, then there should be a media type, otherwise the payload is ignored media_types = self.get_request_payload_content_types() if len(media_types) == 1: # The only available option return media_types[0] media_types_repr = ", ".join(media_types) raise UsageError( "Can not detect appropriate media type. " "You can either specify one of the defined media types " f"or pass any other media type available for serialization. Defined media types: {media_types_repr}" ) def partial_deepcopy(self) -> APIOperation: return self.__class__( path=self.path, # string, immutable method=self.method, # string, immutable definition=fast_deepcopy(self.definition), schema=self.schema.clone(), # shallow copy verbose_name=self.verbose_name, # string, immutable app=self.app, # not deepcopyable base_url=self.base_url, # string, immutable path_parameters=fast_deepcopy(self.path_parameters), headers=fast_deepcopy(self.headers), cookies=fast_deepcopy(self.cookies), query=fast_deepcopy(self.query), body=fast_deepcopy(self.body), ) def clone(self, **components: Any) -> APIOperation: """Create a new instance of this API operation with updated components.""" return self.__class__( path=self.path, method=self.method, verbose_name=self.verbose_name, definition=self.definition, schema=self.schema, app=self.app, base_url=self.base_url, path_parameters=components["path_parameters"], query=components["query"], headers=components["headers"], cookies=components["cookies"], body=components["body"], )
[docs] def make_case( self, *, path_parameters: PathParameters | None = None, headers: Headers | None = None, cookies: Cookies | None = None, query: Query | None = None, body: Body | NotSet = NOT_SET, media_type: str | None = None, ) -> C: """Create a new example for this API operation. The main use case is constructing Case instances completely manually, without data generation. """ return self.schema.make_case( case_cls=self.case_cls, operation=self, path_parameters=path_parameters, headers=headers, cookies=cookies, query=query, body=body, media_type=media_type, )
@property def operation_reference(self) -> str: path = self.path.replace("~", "~0").replace("/", "~1") return f"#/paths/{path}/{self.method}"
[docs] def validate_response(self, response: GenericResponse) -> bool | None: """Validate API response for conformance. :raises CheckFailed: If the response does not conform to the API schema. """ return self.schema.validate_response(self, response)
[docs] def is_response_valid(self, response: GenericResponse) -> bool: """Validate API response for conformance.""" try: self.validate_response(response) return True except CheckFailed: return False
def get_raw_payload_schema(self, media_type: str) -> dict[str, Any] | None: return self.schema._get_payload_schema(self.definition.raw, media_type) def get_resolved_payload_schema(self, media_type: str) -> dict[str, Any] | None: return self.schema._get_payload_schema(self.definition.resolved, media_type)
# backward-compatibility Endpoint = APIOperation class Status(str, Enum): """Status of an action or multiple actions.""" success = "success" failure = "failure" error = "error" skip = "skip" @dataclass(repr=False) class Check: """Single check run result.""" name: str value: Status response: GenericResponse | None elapsed: float example: Case message: str | None = None # Failure-specific context context: FailureContext | None = None request: requests.PreparedRequest | None = None @dataclass(repr=False) class Request: """Request data extracted from `Case`.""" method: str uri: str body: str | None body_size: int | None headers: Headers @classmethod def from_case(cls, case: Case, session: requests.Session) -> Request: """Create a new `Request` instance from `Case`.""" import requests base_url = case.get_full_base_url() kwargs = RequestsTransport().serialize_case(case, base_url=base_url) request = requests.Request(**kwargs) prepared = session.prepare_request(request) # type: ignore return cls.from_prepared_request(prepared) @classmethod def from_prepared_request(cls, prepared: requests.PreparedRequest) -> Request: """A prepared request version is already stored in `requests.Response`.""" body = prepared.body if isinstance(body, str): # can be a string for `application/x-www-form-urlencoded` body = body.encode("utf-8") # these values have `str` type at this point uri = cast(str, prepared.url) method = cast(str, prepared.method) return cls( uri=uri, method=method, headers={key: [value] for (key, value) in prepared.headers.items()}, body=serialize_payload(body) if body is not None else body, body_size=len(body) if body is not None else None, ) def deserialize_body(self) -> bytes | None: """Deserialize the request body. `Request` should be serializable to JSON, therefore body is encoded as base64 string to support arbitrary binary data. """ return deserialize_payload(self.body) @dataclass(repr=False) class Response: """Unified response data.""" status_code: int message: str headers: dict[str, list[str]] body: str | None body_size: int | None encoding: str | None http_version: str elapsed: float verify: bool @classmethod def from_requests(cls, response: requests.Response) -> Response: """Create a response from requests.Response.""" raw = response.raw raw_headers = raw.headers if raw is not None else {} headers = {name: response.raw.headers.getlist(name) for name in raw_headers.keys()} # Similar to http.client:319 (HTTP version detection in stdlib's `http` package) version = raw.version if raw is not None else 10 http_version = "1.0" if version == 10 else "1.1" def is_empty(_response: requests.Response) -> bool: # Assume the response is empty if: # - no `Content-Length` header # - no chunks when iterating over its content return "Content-Length" not in headers and list(_response.iter_content()) == [] body = None if is_empty(response) else serialize_payload(response.content) return cls( status_code=response.status_code, message=response.reason, body=body, body_size=len(response.content) if body is not None else None, encoding=response.encoding, headers=headers, http_version=http_version, elapsed=response.elapsed.total_seconds(), verify=getattr(response, "verify", True), ) @classmethod def from_wsgi(cls, response: WSGIResponse, elapsed: float) -> Response: """Create a response from WSGI response.""" from .transports.responses import get_reason message = get_reason(response.status_code) headers = {name: response.headers.getlist(name) for name in response.headers.keys()} # Note, this call ensures that `response.response` is a sequence, which is needed for comparison data = response.get_data() body = None if response.response == [] else serialize_payload(data) encoding: str | None if body is not None: # Werkzeug <3.0 had `charset` attr, newer versions always have UTF-8 encoding = response.mimetype_params.get("charset", getattr(response, "charset", "utf-8")) else: encoding = None return cls( status_code=response.status_code, message=message, body=body, body_size=len(data) if body is not None else None, encoding=encoding, headers=headers, http_version="1.1", elapsed=elapsed, verify=True, ) def deserialize_body(self) -> bytes | None: """Deserialize the response body. `Response` should be serializable to JSON, therefore body is encoded as base64 string to support arbitrary binary data. """ return deserialize_payload(self.body) TIMEZONE = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo @dataclass class Interaction: """A single interaction with the target app.""" request: Request response: Response | None checks: list[Check] status: Status data_generation_method: DataGenerationMethod phase: TestPhase | None # `description` & `location` are related to metadata about this interaction # NOTE: It will be better to keep it in a separate attribute description: str | None location: str | None parameter: str | None parameter_location: str | None recorded_at: str = field(default_factory=lambda: datetime.datetime.now(TIMEZONE).isoformat()) @classmethod def from_requests( cls, case: Case, response: requests.Response | None, status: Status, checks: list[Check], headers: dict[str, Any] | None, session: requests.Session | None, ) -> Interaction: if response is not None: prepared = response.request request = Request.from_prepared_request(prepared) else: import requests if session is None: session = requests.Session() session.headers.update(headers or {}) request = Request.from_case(case, session) return cls( request=request, response=Response.from_requests(response) if response is not None else None, status=status, checks=checks, data_generation_method=cast(DataGenerationMethod, case.data_generation_method), phase=case.meta.phase if case.meta is not None else None, description=case.meta.description if case.meta is not None else None, location=case.meta.location if case.meta is not None else None, parameter=case.meta.parameter if case.meta is not None else None, parameter_location=case.meta.parameter_location if case.meta is not None else None, ) @classmethod def from_wsgi( cls, case: Case, response: WSGIResponse | None, headers: dict[str, Any], elapsed: float | None, status: Status, checks: list[Check], ) -> Interaction: import requests session = requests.Session() session.headers.update(headers) return cls( request=Request.from_case(case, session), response=Response.from_wsgi(response, elapsed) if response is not None and elapsed is not None else None, status=status, checks=checks, data_generation_method=cast(DataGenerationMethod, case.data_generation_method), phase=case.meta.phase if case.meta is not None else None, description=case.meta.description if case.meta is not None else None, location=case.meta.location if case.meta is not None else None, parameter=case.meta.parameter if case.meta is not None else None, parameter_location=case.meta.parameter_location if case.meta is not None else None, ) @dataclass(repr=False) class TestResult: """Result of a single test.""" __test__ = False method: str path: str verbose_name: str data_generation_method: list[DataGenerationMethod] checks: list[Check] = field(default_factory=list) errors: list[Exception] = field(default_factory=list) interactions: list[Interaction] = field(default_factory=list) logs: list[LogRecord] = field(default_factory=list) is_errored: bool = False is_flaky: bool = False is_skipped: bool = False skip_reason: str | None = None is_executed: bool = False # DEPRECATED: Seed is the same per test run seed: int | None = None def _repr_pretty_(self, *args: Any, **kwargs: Any) -> None: ... def mark_errored(self) -> None: self.is_errored = True def mark_flaky(self) -> None: self.is_flaky = True def mark_skipped(self, exc: SkipTest | unittest.case.SkipTest | None) -> None: self.is_skipped = True if exc is not None: self.skip_reason = str(exc) def mark_executed(self) -> None: self.is_executed = True @property def has_errors(self) -> bool: return bool(self.errors) @property def has_failures(self) -> bool: return any(check.value == Status.failure for check in self.checks) @property def has_logs(self) -> bool: return bool(self.logs) def add_success(self, name: str, example: Case, response: GenericResponse, elapsed: float) -> Check: check = Check( name=name, value=Status.success, response=response, elapsed=elapsed, example=example, request=None ) self.checks.append(check) return check def add_failure( self, name: str, example: Case, response: GenericResponse | None, elapsed: float, message: str, context: FailureContext | None, request: requests.PreparedRequest | None = None, ) -> Check: check = Check( name=name, value=Status.failure, response=response, elapsed=elapsed, example=example, message=message, context=context, request=request, ) self.checks.append(check) return check def add_error(self, exception: Exception) -> None: self.errors.append(exception) def store_requests_response( self, case: Case, response: requests.Response | None, status: Status, checks: list[Check], headers: dict[str, Any] | None, session: requests.Session | None, ) -> None: self.interactions.append(Interaction.from_requests(case, response, status, checks, headers, session)) def store_wsgi_response( self, case: Case, response: WSGIResponse | None, headers: dict[str, Any], elapsed: float | None, status: Status, checks: list[Check], ) -> None: self.interactions.append(Interaction.from_wsgi(case, response, headers, elapsed, status, checks)) @dataclass(repr=False) class TestResultSet: """Set of multiple test results.""" __test__ = False seed: int | None results: list[TestResult] = field(default_factory=list) generic_errors: list[OperationSchemaError] = field(default_factory=list) warnings: list[str] = field(default_factory=list) def _repr_pretty_(self, *args: Any, **kwargs: Any) -> None: ... def __iter__(self) -> Iterator[TestResult]: return iter(self.results) @property def is_empty(self) -> bool: """If the result set contains no results.""" return len(self.results) == 0 and len(self.generic_errors) == 0 @property def has_failures(self) -> bool: """If any result has any failures.""" return any(result.has_failures for result in self) @property def has_errors(self) -> bool: """If any result has any errors.""" return self.errored_count > 0 @property def has_logs(self) -> bool: """If any result has any captured logs.""" return any(result.has_logs for result in self) def _count(self, predicate: Callable) -> int: return sum(1 for result in self if predicate(result)) @property def passed_count(self) -> int: return self._count(lambda result: not result.has_errors and not result.is_skipped and not result.has_failures) @property def skipped_count(self) -> int: return self._count(lambda result: result.is_skipped) @property def failed_count(self) -> int: return self._count(lambda result: result.has_failures and not result.is_errored) @property def errored_count(self) -> int: return self._count(lambda result: result.has_errors or result.is_errored) + len(self.generic_errors) @property def total(self) -> dict[str, dict[str | Status, int]]: """An aggregated statistic about test results.""" output: dict[str, dict[str | Status, int]] = {} for item in self.results: for check in item.checks: output.setdefault(check.name, Counter()) output[check.name][check.value] += 1 output[check.name]["total"] += 1 # Avoid using Counter, since its behavior could harm in other places: # `if not total["unknown"]:` - this will lead to the branch execution # It is better to let it fail if there is a wrong key return {key: dict(value) for key, value in output.items()} def append(self, item: TestResult) -> None: """Add a new item to the results list.""" self.results.append(item) def add_warning(self, warning: str) -> None: """Add a new warning to the warnings list.""" self.warnings.append(warning)