{% set annotations = true %}
{% include '_header.py.jinja' %}
{% from '_utils.py.jinja' import is_async, maybe_async_def, maybe_await, recursive_types, active_provider with context %}
# -- template actions.py.jinja --
from typing import TypeVar
import warnings

from . import types, errors, bases
from ._compat import model_parse
from ._constants import CREATE_MANY_SKIP_DUPLICATES_UNSUPPORTED

if TYPE_CHECKING:
    from .client import {{ names.client_class(is_async) }}
    from .bases import _PrismaModel


_PrismaModelT = TypeVar('_PrismaModelT', bound='_PrismaModel')

{# TODO: support sampling combined unique constraints #}
{% macro sample_where_unique(model) %}
{% set id_field = model.id_field %}
{% if id_field %}'{{ id_field.name }}': {{ id_field.get_sample_data() }},{% else %}# {{ model.name }} where unique filter
{% endif %}
{% endmacro %}
{% set base_error_doc = clean_multiline('''
        prisma.errors.PrismaError
            Catch all for every exception raised by Prisma Client Python
''')
%}
{% set query_error_doc = clean_multiline('''
        prisma.errors.MissingRequiredValueError
            Value is required but was not found
''')
%}
{% set filter_error_doc = clean_multiline('''
        prisma.errors.RecordNotFoundError
            A record is required to exist but was not found
''')
%}
{% for model in dmmf.datamodel.models %}
{% set RawModelType = "prisma.models.%s" % model.name %}
{% set include_doc = 'Specifies which relations should be loaded on the returned %s model' % model.name %}

{% set ModelType = '_PrismaModelT' %}
class {{ model.name }}Actions(Generic[{{ ModelType }}]):
    __slots__ = (
        '_client',
        '_model',
    )

    def __init__(self, client: {{ names.client_class(is_async) }}, model: Type[{{ ModelType }}]) -> None:
        self._client = client
        self._model = model

    {% if active_provider != 'mongodb' %}
    {{ maybe_async_def }}query_raw(
        self,
        query: LiteralString,
        *args: Any,
    ) -> List[{{ ModelType }}]:
        """Execute a raw SQL query

        Parameters
        ----------
        query
            The raw SQL query string to be executed
        *args
            Parameters to be passed to the SQL query, these MUST be used over
            string formatting to avoid an SQL injection vulnerability

        Returns
        -------
        List[{{ RawModelType }}]
            The records returned by the SQL query

        Raises
        ------
        prisma_errors.RawQueryError
            This could be due to invalid syntax, mismatched number of parameters or any other error
        {{ base_error_doc }}

        Example
        -------
        {% set field = model.sampler().get_field() %}
        ```py
        users = {{ maybe_await }}{{ model.name }}.prisma().query_raw(
            'SELECT * FROM {{ model.name }} WHERE {{ field.name }} = {{ sql_param() }}',
            {{ field.get_sample_data() }},
        )
        ```
        """
        return {{ maybe_await }}self._client.query_raw(query, *args, model=self._model)

    {{ maybe_async_def }}query_first(
        self,
        query: LiteralString,
        *args: Any,
    ) -> Optional[{{ ModelType }}]:
        """Execute a raw SQL query, returning the first result

        Parameters
        ----------
        query
            The raw SQL query string to be executed
        *args
            Parameters to be passed to the SQL query, these MUST be used over
            string formatting to avoid an SQL injection vulnerability

        Returns
        -------
        {{ RawModelType }}
            The first record returned by the SQL query
        None
            The raw SQL query did not return any records

        Raises
        ------
        prisma_errors.RawQueryError
            This could be due to invalid syntax, mismatched number of parameters or any other error
        {{ base_error_doc }}

        Example
        -------
        {% set field = model.sampler().get_field() %}
        ```py
        user = {{ maybe_await }}{{ model.name }}.prisma().query_first(
            'SELECT * FROM {{ model.name }} WHERE {{ field.name }} = {{ sql_param() }}',
            {{ field.get_sample_data() }},
        )
        ```
        """
        return {{ maybe_await }}self._client.query_first(query, *args, model=self._model)
    {% endif %}

    {{ maybe_async_def }}create(
        self,
        data: types.{{ model.name }}CreateInput,
        include: Optional[types.{{ model.name}}Include] = None
    ) -> {{ ModelType }}:
        """Create a new {{ model.name }} record.

        Parameters
        ----------
        data
            {{ model.name }} record data
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The created {{ model.name }} record

        Raises
        ------
        {{ query_error_doc }}
        {{ base_error_doc }}

        Example
        -------
        ```py
        # create a {{ model.name }} record from just the required fields
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().create(
            data={
                # data to create a {{ model.name }} record
                {% for field in model.scalar_fields %}
                {% if field.required_on_create %}
                '{{ field.name }}': {{ field.get_sample_data() }},
                {% endif %}
                {% endfor %}
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='create',
            model=self._model,
            arguments={
                'data': data,
                'include': include,
            },
        )
        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}create_many(
        self,
        data: List[types.{{ model.name }}CreateWithoutRelationsInput],
        *,
        skip_duplicates: Optional[bool] = None,
    ) -> int:
        """Create multiple {{ model.name }} records at once.

        This function is *not* available when using SQLite.

        Parameters
        ----------
        data
            List of {{ model.name }} record data
        skip_duplicates
            Boolean flag for ignoring unique constraint errors

        Returns
        -------
        int
            The total number of records created

        Raises
        ------
        prisma.errors.UnsupportedDatabaseError
            Attempting to query when using SQLite
        prisma.errors.UniqueViolationError
            A unique constraint check has failed, these can be ignored with the `skip_duplicates` argument
        {{ query_error_doc }}
        {{ base_error_doc }}

        Example
        -------
        ```py
        total = {{ maybe_await }}{{ model.name }}.prisma().create_many(
            data=[
                {% for _ in range(2) %}
                {
                    # data to create a {{ model.name }} record
                    {% for field in model.scalar_fields %}
                    {% if field.required_on_create %}
                    '{{ field.name }}': {{ field.get_sample_data() }},
                    {% endif %}
                    {% endfor %}
                },
                {% endfor %}
            ],
            skip_duplicates=True,
        )
        ```
        """
        if skip_duplicates and self._client._active_provider in CREATE_MANY_SKIP_DUPLICATES_UNSUPPORTED:
            raise errors.UnsupportedDatabaseError(self._client._active_provider, 'create_many_skip_duplicates')

        resp = {{ maybe_await }}self._client._execute(
            method='create_many',
            model=self._model,
            arguments={
                'data': data,
                'skipDuplicates': skip_duplicates,
            },
            root_selection=['count'],
        )
        return int(resp['data']['result']['count'])

    {{ maybe_async_def }}delete(
        self,
        where: types.{{ model.name }}WhereUniqueInput,
        include: Optional[types.{{ model.name}}Include] = None
    ) -> Optional[{{ ModelType }}]:
        """Delete a single {{ model.name }} record.

        Parameters
        ----------
        where
            {{ model.name }} filter to select the record to be deleted, must be unique
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The deleted {{ model.name }} record
        None
            Could not find a record to delete

        Raises
        ------
        {{ base_error_doc }}
        {{ query_error_doc }}

        Example
        -------
        ```py
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().delete(
            where={
                {{ sample_where_unique(model) }}
            },
        )
        ```
        """
        try:
            resp = {{ maybe_await }}self._client._execute(
                method='delete',
                model=self._model,
                arguments={
                    'where': where,
                    'include': include,
                },
            )
        except errors.RecordNotFoundError:
            return None

        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}find_unique(
        self,
        where: types.{{ model.name }}WhereUniqueInput,
        include: Optional[types.{{ model.name}}Include] = None
    ) -> Optional[{{ ModelType }}]:
        """Find a unique {{ model.name }} record.

        Parameters
        ----------
        where
            {{ model.name }} filter to find the record, must be unique
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The found {{ model.name }} record
        None
            No record matching the given input could be found

        Raises
        ------
        {{ base_error_doc }}
        {{ query_error_doc }}

        Example
        -------
        ```py
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_unique(
            where={
                {{ sample_where_unique(model) }}
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='find_unique',
            model=self._model,
            arguments={
                'where': where,
                'include': include,
            },
        )
        result = resp['data']['result']
        if result is None:
            return None
        return model_parse(self._model, result)

    {{ maybe_async_def }}find_unique_or_raise(
        self,
        where: types.{{ model.name }}WhereUniqueInput,
        include: Optional[types.{{ model.name}}Include] = None
    ) -> {{ ModelType }}:
        """Find a unique {{ model.name }} record. Raises `RecordNotFoundError` if no record is found.

        Parameters
        ----------
        where
            {{ model.name }} filter to find the record, must be unique
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The found {{ model.name }} record

        Raises
        ------
        prisma.errors.RecordNotFoundError
            No record was found
        {{ base_error_doc }}
        {{ query_error_doc }}

        Example
        -------
        ```py
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_unique_or_raise(
            where={
                {{ sample_where_unique(model) }}
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='find_unique_or_raise',
            model=self._model,
            arguments={
                'where': where,
                'include': include,
            },
        )
        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}find_many(
        self,
        take: Optional[int] = None,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
        include: Optional[types.{{ model.name }}Include] = None,
        order: Optional[Union[types.{{ model.name }}OrderByInput, List[types.{{ model.name }}OrderByInput]]] = None,
        distinct: Optional[List[types.{{ model.name }}ScalarFieldKeys]] = None,
    ) -> List[{{ ModelType }}]:
        """Find multiple {{ model.name }} records.

        An empty list is returned if no records could be found.

        Parameters
        ----------
        take
            Limit the maximum number of {{ model.name }} records returned
        skip
            Ignore the first N results
        where
            {{ model.name }} filter to select records
        cursor
            Specifies the position in the list to start returning results from, (typically an ID field)
        include
            {{ include_doc }}
        order
            Order the returned {{ model.name }} records by any field
        distinct
            Filter {{ model.name }} records by either a single distinct field or distinct combinations of fields

        Returns
        -------
        List[{{ RawModelType }}]
            The list of all {{ model.name }} records that could be found

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        # find the first 10 {{ model.name }} records
        {{ model.plural_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_many(take=10)

        {% set field = model.sampler().get_field() %}
        # find the first 5 {{ model.name }} records ordered by the {{ field.name }} field
        {{ model.plural_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_many(
            take=5,
            order={
                '{{ field.name }}': 'desc',
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='find_many',
            model=self._model,
            arguments={
                'take': take,
                'skip': skip,
                'where': where,
                'order_by': order,
                'cursor': cursor,
                'include': include,
                'distinct': distinct,
            },
        )
        return [model_parse(self._model, r) for r in resp['data']['result']]

    {{ maybe_async_def }}find_first(
        self,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
        include: Optional[types.{{ model.name }}Include] = None,
        order: Optional[Union[types.{{ model.name }}OrderByInput, List[types.{{ model.name }}OrderByInput]]] = None,
        distinct: Optional[List[types.{{ model.name }}ScalarFieldKeys]] = None,
    ) -> Optional[{{ ModelType }}]:
        """Find a single {{ model.name }} record.

        Parameters
        ----------
        skip
            Ignore the first N records
        where
            {{ model.name }} filter to select the record
        cursor
            Specifies the position in the list to start returning results from, (typically an ID field)
        include
            {{ include_doc }}
        order
            Order the returned {{ model.name }} records by any field
        distinct
            Filter {{ model.name }} records by either a single distinct field or distinct combinations of fields

        Returns
        -------
        {{ RawModelType }}
            The first {{ model.name }} record found, matching the given arguments
        None
            No record could be found

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        {% set field = model.sampler().get_field() %}
        # find the second {{ model.name }} record ordered by the {{ field.name }} field
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_first(
            skip=1,
            order={
                '{{ field.name }}': 'desc',
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='find_first',
            model=self._model,
            arguments={
                'skip': skip,
                'where': where,
                'order_by': order,
                'cursor': cursor,
                'include': include,
                'distinct': distinct,
            },
        )
        result = resp['data']['result']
        if result is None:
            return None

        return model_parse(self._model, result)

    {{ maybe_async_def }}find_first_or_raise(
        self,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
        include: Optional[types.{{ model.name }}Include] = None,
        order: Optional[Union[types.{{ model.name }}OrderByInput, List[types.{{ model.name }}OrderByInput]]] = None,
        distinct: Optional[List[types.{{ model.name }}ScalarFieldKeys]] = None,
    ) -> {{ ModelType }}:
        """Find a single {{ model.name }} record. Raises `RecordNotFoundError` if no record was found.

        Parameters
        ----------
        skip
            Ignore the first N records
        where
            {{ model.name }} filter to select the record
        cursor
            Specifies the position in the list to start returning results from, (typically an ID field)
        include
            {{ include_doc }}
        order
            Order the returned {{ model.name }} records by any field
        distinct
            Filter {{ model.name }} records by either a single distinct field or distinct combinations of fields

        Returns
        -------
        {{ RawModelType }}
            The first {{ model.name }} record found, matching the given arguments

        Raises
        ------
        prisma.errors.RecordNotFoundError
            No record was found
        {{ base_error_doc }}

        Example
        -------
        ```py
        {% set field = model.sampler().get_field() %}
        # find the second {{ model.name }} record ordered by the {{ field.name }} field
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().find_first_or_raise(
            skip=1,
            order={
                '{{ field.name }}': 'desc',
            },
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='find_first_or_raise',
            model=self._model,
            arguments={
                'skip': skip,
                'where': where,
                'order_by': order,
                'cursor': cursor,
                'include': include,
                'distinct': distinct,
            },
        )
        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}update(
        self,
        data: types.{{ model.name }}UpdateInput,
        where: types.{{ model.name }}WhereUniqueInput,
        include: Optional[types.{{ model.name}}Include] = None
    ) -> Optional[{{ ModelType }}]:
        """Update a single {{ model.name }} record.

        Parameters
        ----------
        data
            {{ model.name }} record data specifying what to update
        where
            {{ model.name }} filter to select the unique record to create / update
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The updated {{ model.name }} record
        None
            No record could be found

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().update(
            where={
                {{ sample_where_unique(model) }}
            },
            data={
                # data to update the {{ model.name }} record to
            },
        )
        ```
        """
        try:
            resp = {{ maybe_await }}self._client._execute(
                method='update',
                model=self._model,
                arguments={
                    'data': data,
                    'where': where,
                    'include': include,
                },
            )
        except errors.RecordNotFoundError:
            return None

        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}upsert(
        self,
        where: types.{{ model.name }}WhereUniqueInput,
        data: types.{{ model.name }}UpsertInput,
        include: Optional[types.{{ model.name}}Include] = None,
    ) -> {{ ModelType }}:
        """Updates an existing record or create a new one

        Parameters
        ----------
        where
            {{ model.name }} filter to select the unique record to create / update
        data
            Data specifying what fields to set on create and update
        include
            {{ include_doc }}

        Returns
        -------
        {{ RawModelType }}
            The created or updated {{ model.name }} record

        Raises
        ------
        {{ base_error_doc }}
        {{ query_error_doc }}

        Example
        -------
        ```py
        {% if model.id_field %}
        {% set id_sample = model.id_field.get_sample_data() %}
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().upsert(
            where={
                '{{ model.id_field.name }}': {{ id_sample }},
            },
            data={
                'create': {
                    '{{ model.id_field.name }}': {{ id_sample }},
                    {% for field in model.scalar_fields %}
                    {% if field.required_on_create and not field.name == model.id_field.name %}
                    '{{ field.name }}': {{ field.get_sample_data(increment=False) }},
                    {% endif %}
                    {% endfor %}
                },
                'update': {
                    {% for field in model.scalar_fields %}
                    {% if field.required_on_create and not field.name == model.id_field.name %}
                    '{{ field.name }}': {{ field.get_sample_data(increment=False) }},
                    {% endif %}
                    {% endfor %}
                },
            },
        )
        {% else %}
        {{ model.instance_name }} = {{ maybe_await }}{{ model.name }}.prisma().upsert(
            where={
                # {{ model.name }} where unique filter
            },
            data={
                'create': {
                    # {{ model.name }} data to be set if the record does not exist
                },
                'update': {
                    # {{ model.name }} data to be set if the record does exist
                },
            },
        )
        {% endif %}
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='upsert',
            model=self._model,
            arguments={
                'where': where,
                'include': include,
                'create': data.get('create'),
                'update': data.get('update'),
            },
        )
        return model_parse(self._model, resp['data']['result'])

    {{ maybe_async_def }}update_many(
        self,
        data: types.{{ model.name }}UpdateManyMutationInput,
        where: types.{{ model.name }}WhereInput,
    ) -> int:
        """Update multiple {{ model.name }} records

        Parameters
        ----------
        data
            {{ model.name }} data to update the selected {{ model.name }} records to
        where
            Filter to select the {{ model.name }} records to update

        Returns
        -------
        int
            The total number of {{ model.name }} records that were updated

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        # update all {{ model.name }} records
        total = {{ maybe_await }}{{ model.name }}.prisma().update_many(
            data={
                {% set field = model.sampler().get_field() %}
                '{{ field.name }}': {{ field.get_sample_data() }}
            },
            where={}
        )
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='update_many',
            model=self._model,
            arguments={'data': data, 'where': where,},
            root_selection=['count'],
        )
        return int(resp['data']['result']['count'])

    @overload
    {{ maybe_async_def }}count(
        self,
        select: None = None,
        take: Optional[int] = None,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
    ) -> int:
        {# We add the docstring here as Pylance / VSCode picks up docstrings
           from the overloads of a function.

           We also add the docstring to the final implementation function so that it
           can also be accessed at runtime.
        #}
        {% macro count_doc() %}
"""Count the number of {{ model.name }} records present in the database

        Parameters
        ----------
        select
            Select the {{ model.name }} fields to be counted
        take
            Limit the maximum result
        skip
            Ignore the first N records
        where
            {{ model.name }} filter to find records
        cursor
            Specifies the position in the list to start counting results from, (typically an ID field)
        order
            This parameter is deprecated and will be removed in a future release

        Returns
        -------
        int
            The total number of records found, returned if `select` is not given

        prisma.types.{{ model.name }}CountAggregateOutput
            Data returned when `select` is used, the fields present in this dictionary will
            match the fields passed in the `select` argument

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        # total: int
        total = {{ maybe_await }}{{ model.name }}.prisma().count()

        # results: prisma.types.{{ model.name }}CountAggregateOutput
        results = {{ maybe_await }}{{ model.name }}.prisma().count(
            select={
                '_all': True,
                '{{ model.sampler().get_field().name }}': True,
            },
        )
        ```
        """
        {% endmacro %}
        {{ count_doc() }}

    @overload
    {{ maybe_async_def }}count(
        self,
        select: types.{{ model.name }}CountAggregateInput,
        take: Optional[int] = None,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
    ) -> types.{{ model.name }}CountAggregateOutput:
        ...

    {{ maybe_async_def }}count(
        self,
        select: Optional[types.{{ model.name }}CountAggregateInput] = None,
        take: Optional[int] = None,
        skip: Optional[int] = None,
        where: Optional[types.{{ model.name }}WhereInput] = None,
        cursor: Optional[types.{{ model.name }}WhereUniqueInput] = None,
    ) -> Union[int, types.{{ model.name }}CountAggregateOutput]:
        {{ count_doc() }}
        # TODO: this selection building should be moved to the QueryBuilder
        #
        # note the distinction between checking for `not select` here and `select is None`
        # later is to handle the case that the given select dictionary is empty, this
        # is a limitation of our types.
        if not select:
            root_selection = ['_count { _all }']
        else:
            {% raw %}
            root_selection = [
                '_count {{ {0} }}'.format(' '.join(k for k, v in select.items() if v is True))
            ]
            {% endraw %}

        resp = {{ maybe_await }}self._client._execute(
            method='count',
            model=self._model,
            arguments={
                'take': take,
                'skip': skip,
                'where': where,
                'cursor': cursor,
            },
            root_selection=root_selection,
        )

        if select is None:
            return cast(int, resp['data']['result']['_count']['_all'])
        else:
            return cast(types.{{ model.name }}CountAggregateOutput, resp['data']['result']['_count'])

    {{ maybe_async_def }}delete_many(
        self,
        where: Optional[types.{{ model.name }}WhereInput] = None
    ) -> int:
        """Delete multiple {{ model.name }} records.

        Parameters
        ----------
        where
            Optional {{ model.name }} filter to find the records to be deleted

        Returns
        -------
        int
            The total number of {{ model.name }} records that were deleted

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        # delete all {{ model.name }} records
        total = {{ maybe_await }}{{ model.name }}.prisma().delete_many()
        ```
        """
        resp = {{ maybe_await }}self._client._execute(
            method='delete_many',
            model=self._model,
            arguments={'where': where},
            root_selection=['count'],
        )
        return int(resp['data']['result']['count'])

    # TODO: make this easier to work with safely, currently output fields are typed as
    #       not required, we should refactor the return type
    # TODO: consider returning a Dict where the keys are a Tuple of the `by` selection
    # TODO: statically type that the order argument is required when take or skip are present
    {{ maybe_async_def }}group_by(
        self,
        {% if recursive_types %}
        by: List['types.{{ model.name }}ScalarFieldKeysT'],
        {% else %}
        by: List['types.{{ model.name }}ScalarFieldKeys'],
        {% endif %}
        *,
        where: Optional['types.{{ model.name }}WhereInput'] = None,
        take: Optional[int] = None,
        skip: Optional[int] = None,
        avg: Optional['types.{{ model.name }}AvgAggregateInput'] = None,
        sum: Optional['types.{{ model.name }}SumAggregateInput'] = None,
        min: Optional['types.{{ model.name }}MinAggregateInput'] = None,
        max: Optional['types.{{ model.name }}MaxAggregateInput'] = None,
        having: Optional['types.{{ model.name }}ScalarWhereWithAggregatesInput'] = None,
        count: Optional[Union[bool, 'types.{{ model.name }}CountAggregateInput']] = None,
        {% if recursive_types %}
        order: Optional[Union[Mapping['types.{{ model.name }}ScalarFieldKeysT', 'types.SortOrder'], List[Mapping['types.{{ model.name }}ScalarFieldKeysT', 'types.SortOrder']]]] = None,
        {% else %}
        order: Optional[Union[Mapping['types.{{ model.name }}ScalarFieldKeys', 'types.SortOrder'], List[Mapping['types.{{ model.name }}ScalarFieldKeys', 'types.SortOrder']]]] = None,
        {% endif %}
    ) -> List['types.{{ model.name }}GroupByOutput']:
        """Group {{ model.name }} records by one or more field values and perform aggregations
        each group such as finding the average.

        Parameters
        ----------
        by
            List of scalar {{ model.name }} fields to group records by
        where
            {{ model.name }} filter to select records
        take
            Limit the maximum number of {{ model.name }} records returned
        skip
            Ignore the first N records
        avg
            Adds the average of all values of the specified fields to the `_avg` field
            in the returned data.
        sum
            Adds the sum of all values of the specified fields to the `_sum` field
            in the returned data.
        min
            Adds the smallest available value for the specified fields to the `_min` field
            in the returned data.
        max
            Adds the largest available value for the specified fields to the `_max` field
            in the returned data.
        count
            Adds a count of non-fields to the `_count` field in the returned data.
        having
            Allows you to filter groups by an aggregate value - for example only return
            groups having an average age less than 50.
        order
            Lets you order the returned list by any property that is also present in `by`.
            Only **one** field is allowed at a time.

        Returns
        -------
        List[prisma.types.{{ model.name }}GroupByOutput]
            A list of dictionaries representing the {{ model.name }} record,
            this will also have additional fields present if aggregation arguments
            are used (see the above parameters)

        Raises
        ------
        {{ base_error_doc }}

        Example
        -------
        ```py
        {% set field = model.sampler().get_field().name %}
        # group {{ model.name }} records by {{ field }} values
        # and count how many records are in each group
        results = {{ maybe_await }}{{ model.name }}.prisma().group_by(
            ['{{ field }}'],
            count=True,
        )
        ```
        """
        if order is None:
            if take is not None:
                raise TypeError('Missing argument: \'order\' which is required when \'take\' is present')

            if skip is not None:
                raise TypeError('Missing argument: \'order\' which is required when \'skip\' is present')

        root_selection: List[str] = [*by]
        if avg is not None:
            root_selection.append(_select_fields('_avg', avg))

        if min is not None:
            root_selection.append(_select_fields('_min', min))

        if sum is not None:
            root_selection.append(_select_fields('_sum', sum))

        if max is not None:
            root_selection.append(_select_fields('_max', max))

        if count is not None:
            if count is True:
                root_selection.append('_count { _all }')
            elif isinstance(count, dict):
                root_selection.append(_select_fields('_count', count))

        resp = {{ maybe_await }}self._client._execute(
            method='group_by',
            model=self._model,
            arguments={
                'by': by,
                'take': take,
                'skip': skip,
                'where': where,
                'having': having,
                'orderBy': order,
            },
            root_selection=root_selection,
        )
        return resp['data']['result']  # type: ignore[no-any-return]

{% endfor %}


def _select_fields(root: str, select: Mapping[str, Any]) -> str:
    """Helper to build a GraphQL selection string

    This is a work around until field selection is added to the query builder.
    """
    {% raw %}
    return root + ' {{ {0} }}'.format(' '.join(k for k, v in select.items() if v is True))
    {% endraw %}


from . import models
