IEEE.org     |     IEEE Xplore Digital Library     |     IEEE Standards     |     IEEE Spectrum     |     More Sites

Verified Commit 3032ef02 authored by Emi Simpson's avatar Emi Simpson
Browse files

[api] Switch to accepting `application/json` over `application/x-www-form-urlencoded`

parent 715f3894
Pipeline #1056 passed with stage
in 52 seconds
......@@ -2,15 +2,15 @@ from mystic.database import MalformedId, parse_alphaid, Project
from mystic.outcome import GenericAlert, Outcome, Outcomes, ProjectField, ProjectFieldKind
from mystic.queries import ProjectInfo, Query
from mystic.types import Backend, ProjectID, Request, UncheckedPID, UserID, Url
from mystic.types import Backend, ProjectID, WebRequest, UncheckedPID, UserID, Url
from mystic.sources import Source, SOURCE_PROCESSORS
from mystic import queries, outcome
from functools import reduce
from typing import Callable, cast, Collection, Iterable, List, Mapping, Tuple, TypeAlias
Action: TypeAlias = Callable[[Request], Outcomes | Query[Outcomes, Outcomes]]
AuthedProjectAction: TypeAlias = Callable[[Request, UncheckedPID, UserID], Outcomes | Query[Outcomes, Outcomes]]
Action: TypeAlias = Callable[[WebRequest], Outcomes | Query[Outcomes, Outcomes]]
AuthedProjectAction: TypeAlias = Callable[[WebRequest, UncheckedPID, UserID], Outcomes | Query[Outcomes, Outcomes]]
force_login: Tuple[outcome.ForceLogin] = (outcome.ForceLogin(),)
......@@ -48,7 +48,7 @@ def _fold_sources(sources: Iterable[Source]) -> Mapping[Backend, Collection[Url]
sources,
cast(Mapping[Backend, List[Url]], dict())) # An empty dict + type annotations
def delete_source(req: Request) -> Outcomes | Query[Outcomes, Outcomes]:
def delete_source(req: WebRequest) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Remove a source with the ID in `form[source_id]` from a project the active user owns
"""
......@@ -74,7 +74,7 @@ def delete_source(req: Request) -> Outcomes | Query[Outcomes, Outcomes]:
# If the user wasn't logged in, send them to the login page
return (outcome.ForceLogin(),)
def add_source(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
def add_source(req: WebRequest, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Add a source to a project the user owns
......@@ -145,7 +145,7 @@ def add_source(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Quer
"This data source has already been added to this project"),
}[e],)))
def add_owner(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
def add_owner(req: WebRequest, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Add another owner to one of the user's projects
......@@ -181,7 +181,7 @@ def add_owner(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query
"This user does not exist or hasn't logged in yet"),
}[error],)))
def edit_properties(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
def edit_properties(req: WebRequest, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Edit the properties of one of the user's projects
......@@ -260,7 +260,7 @@ def edit_properties(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes |
# report the missing fields
queries.Noop(missing_fields)))
def remove_owner(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
def remove_owner(req: WebRequest, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Remove an owner from one of the user's projects
......@@ -287,7 +287,7 @@ def remove_owner(req: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Qu
ProjectField(pid, ProjectFieldKind.Generic),
"That user is not an owner of this project"),)))
def delete_project(_: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
def delete_project(_: WebRequest, pid: UncheckedPID, user: UserID) -> Outcomes | Query[Outcomes, Outcomes]:
"""
Delete one of the user's projects
......@@ -301,13 +301,13 @@ def delete_project(_: Request, pid: UncheckedPID, user: UserID) -> Outcomes | Qu
on_error=lambda _: (outcome.Error(ProjectField(pid, ProjectFieldKind.Generic),
"[INTERNAL ERROR] This project does not exist"),)))
def set_high_contrast(req: Request) -> Outcomes:
def set_high_contrast(req: WebRequest) -> Outcomes:
"""
Turns on high contrast mode
"""
return (outcome.AmendSession({'high_contrast': req.form.get("enabled") != 'False'}),)
def logout(_: Request) -> Outcomes:
def logout(_: WebRequest) -> Outcomes:
"""
Equivilent to :meth:`outcome.Logout()`
"""
......@@ -320,7 +320,7 @@ def auth_action(action: AuthedProjectAction) -> Action:
This allows using these actions as standard actions
"""
# unauthed_action is the standard signature for actions
def unauthed_action(req: Request) -> Outcomes | Query[Outcomes, Outcomes]:
def unauthed_action(req: WebRequest) -> Outcomes | Query[Outcomes, Outcomes]:
if req.user is None:
# If the user isn't logged in, send them to the login page
return force_login
......@@ -336,7 +336,7 @@ def auth_action(action: AuthedProjectAction) -> Action:
return action(req, project_id, req.user)
return unauthed_action
def try_get_pid(req: Request) -> outcome.Error | UncheckedPID:
def try_get_pid(req: WebRequest) -> outcome.Error | UncheckedPID:
"""
Attempt to determine the project ID specified in a :class:`Request`
"""
......@@ -390,7 +390,7 @@ ACTION_NAMES: Mapping[str, Action] = {
A mapping from action names to the corresponding actions
"""
def get_preserved_form_data(req: Request) -> Collection[Tuple[str, str]]:
def get_preserved_form_data(req: WebRequest) -> Collection[Tuple[str, str]]:
"""
Translate form data to flashable messages, preserving it between requests
"""
......@@ -402,7 +402,7 @@ def get_preserved_form_data(req: Request) -> Collection[Tuple[str, str]]:
else:
return cast(Collection[Tuple[str, str]], tuple())
def identify_action(req: Request) -> outcome.Error | Action:
def identify_action(req: WebRequest) -> outcome.Error | Action:
"""
Based on the form fields specified in the :class:`Request`, pick an action to run
"""
......
......@@ -17,29 +17,13 @@ from base64 import b64decode
from dataclasses import dataclass, field
from functools import wraps
from http import HTTPStatus
from typing import Callable, cast, Collection, Concatenate, Generic, Mapping, Optional, ParamSpec, TypeAlias, TypeGuard, TypeVar, Union
from typing import Callable, cast, Concatenate, Generic, Mapping, Optional, ParamSpec, TypeAlias, TypeGuard, TypeVar
from mystic import config
from mystic.coordination import CoordinatorFlow
from mystic.queries import Query
from mystic.query_driver import drive_query_to_end
from mystic.types import Backend, Request, Url
JsonPrimitive: TypeAlias = Union[str, int, float, bool, None, Backend, Url]
"""
A list of primitive types that can be serialized into JSON
Note: A few NewTypes are included here in order to allow typechecking within maps and
lists, which are invariant.
"""
JsonSerializable: TypeAlias = Union[Mapping[JsonPrimitive, 'JsonSerializable'], Collection['JsonSerializable'], JsonPrimitive]
"""
A type alias for *any* type which can be serialized into JSON
Builds off of the :class:`JsonPrimitive` type by adding mappings and collections as
supported types
"""
from mystic.types import ApiGetRequest, ApiJsonRequest, JsonParseError, JsonSerializable
D = TypeVar('D', bound=JsonSerializable, covariant=True)
@dataclass(frozen=True)
......@@ -131,9 +115,22 @@ def parse_session_token(token: str) -> Optional[bytes]:
except binascii.Error:
return None
def provide_local_models(endpoint: Callable[Concatenate[Request, P], R[JsonSerializable]]) -> Callable[P, Response]:
def make_response(endpoint: Callable[P, R[JsonSerializable]]) -> Callable[P, Response]:
"""
Allows a function to be defined in terms of :class:`Request` and :class:`R`
Convert a return type of :class:`R` into a flask-compatible :class:`Response`
The wrapped function may take any number of args
"""
@wraps(endpoint)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Response:
return endpoint(*args, **kwargs).to_response()
return wrapper
E = TypeVar('E')
O = TypeVar('O')
def json_request(handle_error: Callable[[JsonParseError], E]) -> Callable[[Callable[Concatenate[ApiJsonRequest, P], O]], Callable[P, O | E]]:
"""
Provides access to a :class:`ApiJsonRequest` to the endpoint
This means, of course, that attempting to call the resulting function in a context
outside of a Flask context will produce an error. Generally, this decorator is meant
......@@ -142,16 +139,41 @@ def provide_local_models(endpoint: Callable[Concatenate[Request, P], R[JsonSeria
endpoints possible.
The wrapped function may take one or more args, but the first must be of type
:class:`Request`. The other arguments are preserved. The output type of the function
must be :class:`R`[:class:`JsonSerializable`], which will be converted to a standard
flask :class:`Response`.
:class:`ApiJsonRequest`. The other arguments are preserved.
In the event that the request does not have valid JSON, :class:`handle_error` will be
called in place of the function, and provided with details as to how the JSON was
invalid.
"""
def decorator(endpoint: Callable[Concatenate[ApiJsonRequest, P], O]) -> Callable[P, O | E]:
@wraps(endpoint)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> O | E:
req = ApiJsonRequest.build_request()
if isinstance(req, JsonParseError):
return handle_error(req)
else:
return endpoint(req, *args, **kwargs)
return wrapper
return decorator
def query_args_request(endpoint: Callable[Concatenate[ApiGetRequest, P], O]) -> Callable[P, O]:
"""
Provides access to a :class:`ApiGetRequest` to the endpoint
This means, of course, that attempting to call the resulting function in a context
outside of a Flask context will produce an error. Generally, this decorator is meant
to be used on endpoints, that is, functions which are bound to a Flask route. It is
part of a series of decorators available in this module to make writing pure flask
endpoints possible.
The wrapped function may take one or more args, but the first must be of type
:class:`ApiGetRequest`. The other arguments are preserved.
"""
@wraps(endpoint)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Response:
return endpoint(Request.build_request(), *args, **kwargs).to_response()
def wrapper(*args: P.args, **kwargs: P.kwargs) -> O:
return endpoint(ApiGetRequest.build_request(), *args, **kwargs)
return wrapper
O = TypeVar('O')
def after_running_execute_queries(
is_output: Callable[[O | Query[O, O]], TypeGuard[O]],
) -> Callable[[Callable[P, O | Query[O, O]]], Callable[P, O]]:
......
......@@ -5,28 +5,27 @@ from http import HTTPStatus
from typing import Callable, Collection, Concatenate, NoReturn, ParamSpec, TypeAlias, TypeVar
from mystic import coordination, queries
from mystic.api import after_running_execute_coordinator_requests, after_running_execute_queries, is_r, parse_session_token, R, Request, provide_local_models
from mystic.api import after_running_execute_coordinator_requests, after_running_execute_queries, is_r, json_request, make_response, parse_session_token, query_args_request, R
from mystic.api.v1 import auth
from mystic.api.v1.errors import ApiErrorCode, BadSessionError, EndpointDNEError, ExpiredSessionError, MalformedArgumentError, MethodNotAllowedError, MissingFieldsError, NameTakenError, UnknownError
from mystic.api.v1.errors import ApiErrorCode, BadSessionError, EndpointDNEError, ExpiredSessionError, handle_json_parse_error, MalformedArgumentError, MethodNotAllowedError, MissingFieldsError, NameTakenError, UnknownError
from mystic.api.v1.types import Instance, Job, job_from_coordinator_job, lookup_backend, Project, project_from_database_project, User, user_from_database_user
from mystic.coordination import CoordinatorFlow
from mystic.queries import Query
from mystic.types import UncheckedPID, UserID
from mystic.types import ApiGetRequest, ApiJsonRequest, UncheckedPID, UserID
from mystic.utils import panic
bp = Blueprint('api_v1', __name__, url_prefix='/api/v1')
@provide_local_models
def error_404(_: Request, path: str) -> R[EndpointDNEError]:
@make_response
def error_404(path: str) -> R[EndpointDNEError]:
print(path)
return R(HTTPStatus.NOT_FOUND, EndpointDNEError(
code=ApiErrorCode.EndpointDNE,
message="This URL is not a valid endpoint! Check your spelling against the docs"),
headers={'Access-Control-Allow-Origin': '*'})
@provide_local_models
def error_405(r: Request, _: int) -> R[MethodNotAllowedError]:
assert r # Just to mark it as used
@make_response
def error_405(_: int) -> R[MethodNotAllowedError]:
return R(HTTPStatus.METHOD_NOT_ALLOWED, MethodNotAllowedError(
code=ApiErrorCode.MethodNotAllowed,
message="This method is not allowed for this endpoint"),
......@@ -52,7 +51,8 @@ def unk_err(msg: str) -> R[UnknownError]:
P = ParamSpec('P')
T = TypeVar('T')
def authenticate(func: Callable[Concatenate[Request, GetUser, P], T]) -> Callable[Concatenate[Request, P], T | R[BadSessionError]]:
Req = TypeVar('Req', ApiJsonRequest, ApiGetRequest)
def authenticate(func: Callable[Concatenate[Req, GetUser, P], T]) -> Callable[Concatenate[Req, P], T | R[BadSessionError]]:
"""
A decorator which wraps an authenticated endpoint
......@@ -68,7 +68,7 @@ def authenticate(func: Callable[Concatenate[Request, GetUser, P], T]) -> Callabl
include this in your documentation
"""
@wraps(func)
def wrapper(req: Request, *args: P.args, **kwargs: P.kwargs) -> T | R[BadSessionError]:
def wrapper(req: Req, *args: P.args, **kwargs: P.kwargs) -> T | R[BadSessionError]:
authorization = req.headers.get('Authorization', None)
# Case 1: The authization header wasn't even provided
if authorization is None:
......@@ -119,10 +119,11 @@ def authenticate(func: Callable[Concatenate[Request, GetUser, P], T]) -> Callabl
return wrapper
@bp.get('/whoami')
@provide_local_models
@make_response
@query_args_request
@after_running_execute_queries(is_r)
@authenticate
def whoami(_: Request, get_user: GetUser) -> Query[R[User], R[ExpiredSessionError]]:
def whoami(_: ApiGetRequest, get_user: GetUser) -> Query[R[User], R[ExpiredSessionError]]:
"""
Display some basic information about the active user
......@@ -138,9 +139,10 @@ def whoami(_: Request, get_user: GetUser) -> Query[R[User], R[ExpiredSessionErro
on_error=lambda _: panic(f'RetrieveUserInfo claimed the ID {uid} was invalid, but we just verified it with a GetUser'))))
@bp.get('/projects')
@provide_local_models
@query_args_request
@make_response
@after_running_execute_queries(is_r)
def get_projects(r: Request) -> R[MalformedArgumentError] | Query[R[Collection[Project]], NoReturn]:
def get_projects(r: ApiGetRequest) -> R[MalformedArgumentError] | Query[R[Collection[Project]], NoReturn]:
# Parse Limit
try:
......@@ -214,18 +216,34 @@ def get_projects(r: Request) -> R[MalformedArgumentError] | Query[R[Collection[
on_error=lambda noreturn: noreturn)
@bp.post('/project')
@provide_local_models
@make_response
@json_request(handle_json_parse_error)
@authenticate
@after_running_execute_queries(is_r)
def create_project(r: Request, get_user: GetUser) -> Query[R[Project], R[ExpiredSessionError | NameTakenError | UnknownError]] | R[MissingFieldsError]:
def create_project(r: ApiJsonRequest, get_user: GetUser) -> Query[R[Project], R[ExpiredSessionError | NameTakenError | UnknownError]] | R[MissingFieldsError | MalformedArgumentError]:
fields = ('name', 'slug', 'desc')
name, slug, desc = [r.form.get(field, None) for field in fields]
name, slug, desc = [r.json.get(field, None) for field in fields]
if name is None or slug is None or desc is None:
return R(HTTPStatus.BAD_REQUEST, MissingFieldsError(
code=ApiErrorCode.MissingFields,
message="The PUT /project endpoint requires the 'name', 'slug', and 'desc' fields, which were not all provided.",
missing_fields=[field for field in fields if field not in r.form]
missing_fields=[field for field in fields if field not in r.json]
))
elif not isinstance(name, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code=ApiErrorCode.MalformedArgument,
message='The "name" field must be a string',
field='name'))
elif not isinstance(slug, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code=ApiErrorCode.MalformedArgument,
message='The "slug" field must be a string',
field='slug'))
elif not isinstance(desc, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code=ApiErrorCode.MalformedArgument,
message='The "desc" field must be a string',
field='desc'))
return queries.Transaction( # We'll be running several queries in succession
queries.BoundQuery(
get_user, # First, validate the user's session and get their UID
......@@ -276,18 +294,18 @@ def create_project(r: Request, get_user: GetUser) -> Query[R[Project], R[Expired
read_only=False)
@bp.get('/project/<int:pid>/jobs')
@provide_local_models
@make_response
@after_running_execute_coordinator_requests(is_r)
def get_jobs(_: Request, pid: int) -> CoordinatorFlow[R[Collection[Job]]]:
def get_jobs(pid: int) -> CoordinatorFlow[R[Collection[Job]]]:
tag = f'p-{pid:x}'
return coordination.MappedFlow(
coordination.GetJobsByTag([tag]),
and_then=lambda jobs: R(HTTPStatus.OK, [job_from_coordinator_job(job) for job in jobs[tag]]))
@bp.get('/instance')
@provide_local_models
@make_response
@after_running_execute_coordinator_requests(is_r)
def instance_info(_: Request) -> CoordinatorFlow[R[Instance]]:
def instance_info() -> CoordinatorFlow[R[Instance]]:
return coordination.MappedFlow(
coordination.AvailableBackends(),
and_then=lambda backends: R(HTTPStatus.OK, Instance(
......
......@@ -9,10 +9,10 @@ from typing import Optional, TypedDict
from mystic import queries
from mystic.queries import direct_auth
from mystic.queries.direct_auth import ValidatePasswordError
from mystic.api import after_running_execute_queries, is_r, R, provide_local_models
from mystic.api.v1.errors import ApiErrorCode, BadPasswordError, InternalError, MissingFieldsError, UserDneError, NameTakenError
from mystic.api import after_running_execute_queries, is_r, json_request, make_response, R
from mystic.api.v1.errors import ApiErrorCode, BadPasswordError, handle_json_parse_error, InternalError, MalformedArgumentError, MissingFieldsError, UserDneError, NameTakenError
from mystic.queries import Query, direct_auth
from mystic.types import Request
from mystic.types import ApiJsonRequest
from mystic.utils import panic
class Auth(TypedDict):
......@@ -80,7 +80,7 @@ class AuthModule:
sso=self.saml_login_url is not None
)).to_response()
def post_login(self, req: Request) -> R[MissingFieldsError] | Query[R[Session], R[UserDneError | BadPasswordError | InternalError]]:
def post_login(self, req: ApiJsonRequest) -> R[MalformedArgumentError | MissingFieldsError] | Query[R[Session], R[UserDneError | BadPasswordError | InternalError]]:
"""
The `POST /api/v1/auth/direct` method
......@@ -89,17 +89,27 @@ class AuthModule:
"""
# Check that both required parameters are present
try:
username = req.form['username']
password = req.form['password']
username = req.json['username']
password = req.json['password']
except KeyError:
# If either is missing, tell the user about it with an error
missing = [f for f in ['username', 'password'] if f not in req.form]
missing = [f for f in ['username', 'password'] if f not in req.json]
return R(HTTPStatus.BAD_REQUEST, MissingFieldsError(
code = ApiErrorCode.MissingFields,
missing_fields = missing,
message =
f'The following fields were not included in the API request: {missing}'
))
if not isinstance(username, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "username" field should be a string, but was "{type(username).__name__}"',
field = 'username'))
if not isinstance(password, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "password" field should be a string, but was "{type(password).__name__}"',
field = 'password'))
# Begin constructing the query
return queries.BoundQuery(
queries.MappedQuery(
......@@ -137,23 +147,48 @@ class AuthModule:
on_error=lambda noreturn: noreturn)
)
def put_signup(self, req: Request) -> R[MissingFieldsError] | Query[R[Session], R[NameTakenError]]:
def put_signup(self, req: ApiJsonRequest) -> R[MissingFieldsError | MalformedArgumentError] | Query[R[Session], R[NameTakenError]]:
try:
# Validate that all of the keys we need are present
username = req.form['username']
password = req.form['password']
first_name = req.form['first']
last_name = req.form['last']
email = req.form['email']
username = req.json['username']
password = req.json['password']
first_name = req.json['first']
last_name = req.json['last']
email = req.json['email']
except KeyError:
# If any are missing, tell the user about it with an error
missing = [f for f in ['username', 'password', 'first', 'last', 'email'] if f not in req.form]
missing = [f for f in ['username', 'password', 'first', 'last', 'email'] if f not in req.json]
return R(HTTPStatus.BAD_REQUEST, MissingFieldsError(
code = ApiErrorCode.MissingFields,
missing_fields = missing,
message =
f'The following fields were not included in the API request: {missing}'
))
if not isinstance(username, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "username" field should be a string, but was "{type(username).__name__}"',
field = 'username'))
if not isinstance(password, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "password" field should be a string, but was "{type(password).__name__}"',
field = 'password'))
if not isinstance(first_name, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "first" field should be a string, but was "{type(first_name).__name__}"',
field = 'first'))
if not isinstance(last_name, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "last" field should be a string, but was "{type(last_name).__name__}"',
field = 'last'))
if not isinstance(email, str):
return R(HTTPStatus.BAD_REQUEST, MalformedArgumentError(
code = ApiErrorCode.MalformedArgument,
message = f'The "email" field should be a string, but was "{type(email).__name__}"',
field = 'email'))
# Some of these queries are interdependant, so bunch them in a transaction
return queries.Transaction(
......@@ -196,7 +231,15 @@ class AuthModule:
# Type comment needed due to flask tying weirdness
_ = bp.get('/')(self.get_auth_dict)
_ = bp.post('/direct')(provide_local_models(after_running_execute_queries(is_r)((self.post_login))))
_ = bp.put('/direct')(provide_local_models(after_running_execute_queries(is_r)((self.put_signup))))
_ = bp.post('/direct')(
make_response(
json_request(handle_json_parse_error)(
after_running_execute_queries(is_r)(
(self.post_login)))))
_ = bp.put('/direct')(
make_response(
json_request(handle_json_parse_error)(
after_running_execute_queries(is_r)(
self.put_signup))))
return bp
from enum import IntEnum, unique
from http import HTTPStatus
from typing import Collection, Literal, TypeAlias, TypedDict, Union
from mystic.api import R
from mystic.types import JsonParseError
@unique
class ApiErrorCode(IntEnum):
"""
......@@ -59,7 +63,17 @@ class ApiErrorCode(IntEnum):
MalformedArgument = 8
"""
The error code for :class:`MalformedArgument`
The error code for :class:`MalformedArgumentError`
"""
NoJson = 9
"""
The error code for :class:`NoJsonError`
"""
MalformedJson = 10
"""
The error code for :class:`MalformedJsonError`
"""
EndpointDNE = 404
......@@ -324,6 +338,40 @@ class MalformedArgumentError(ErrorTemplate):
The name of the field which was malformed
"""
class NoJsonError(ErrorTemplate):
"""
A call was made to an endpoint that expect a JSON request, but no JSON was received
This generally indicates that the MIME type on your request was invalid. The API
expects a MIME type of `application/json`. Requests with a MIME of
`application/x-www-form-urlencoded` or `application/form-multipart` will trigger this
error.
"""
code: Literal[ApiErrorCode.NoJson]
"""
A code identifying this as a :class:`NoJsonError` (see :class:`ApiErrorCode`)
"""
class MalformedJsonError(ErrorTemplate):
"""
The JSON in the request body was malformed
This indicates that either
- The JSON in the request was not up to spec OR
- The top-level JSON object was a string, int, or array rather than an object
"""
code: Literal[ApiErrorCode.MalformedJson]
"""
A code identifying this as a :class:`MalformedJsonError` (see :class:`ApiErrorCode`)
"""
sub_code: Literal[JsonParseError.INVALID_JSON, JsonParseError.JSON_NOT_DICT]
"""
1 if the JSON itself was malformed, 2 if it was valid, but was not a JSON object
"""
class EndpointDNEError(ErrorTemplate):
"""
A call was made to an endpoint which does not exist
......@@ -352,6 +400,24 @@ class MethodNotAllowedError(ErrorTemplate):
A code identifying this as a :class:`MethodNotAllowedError` (see :class:`ApiErrorCode`)
"""
def handle_json_parse_error(e: JsonParseError) -> R[NoJsonError | MalformedJsonError]:
"""
Convert a :class:`JsonParseError` into a :class:`R` wrapping an error type
"""
if e == JsonParseError.NOT_JSON:
return R(
HTTPStatus.BAD_REQUEST,
NoJsonError(
code=ApiErrorCode.NoJson,
message="This endpoint takes arguments in the form of a JSON object, but no JSON was provided. Please check the Content-Type of your request."))
else:
return R(
HTTPStatus.BAD_REQUEST,
MalformedJsonError(
code=ApiErrorCode.MalformedJson,
sub_code=e,
message="The JSON provided to this endpoint was malformed"))
ApiError: TypeAlias = Union[UnknownError, MissingFieldsError]
"""
The type of errors returned by the V1 API
......
from typing import Mapping, NewType, NamedTuple, Optional
from enum import IntEnum
from dataclasses import dataclass
from typing import cast, Collection, Mapping, NewType, NamedTuple, Optional, Union, TypeAlias
Backend = NewType('Backend', str)
"""
......@@ -10,26 +12,114 @@ Url = NewType('Url', str)
A `str`-like field exclusively for URLs
"""
JsonPrimitive: TypeAlias = Union[str, int, float, bool, None, Backend, Url]