1
+ from collections .abc import Iterable
1
2
from functools import wraps
2
- from typing import Any , Callable , Iterable , List , Optional , Tuple , Type , Union
3
+ from typing import (
4
+ TYPE_CHECKING ,
5
+ Any ,
6
+ Awaitable ,
7
+ Callable ,
8
+ Dict ,
9
+ List ,
10
+ Optional ,
11
+ Tuple ,
12
+ Type ,
13
+ Union ,
14
+ )
15
+
16
+ from flask import Response , current_app , jsonify , request
17
+ from flask .typing import ResponseReturnValue , RouteCallable
18
+
19
+ try :
20
+ from flask_restful import ( # type: ignore
21
+ original_flask_make_response as make_response ,
22
+ )
23
+ except ImportError :
24
+ from flask import make_response
3
25
4
- from flask import Response , current_app , jsonify , make_response , request
5
26
from pydantic import BaseModel , ValidationError
6
27
from pydantic .tools import parse_obj_as
7
28
13
34
)
14
35
from .exceptions import ValidationError as FailedValidation
15
36
16
- try :
17
- from flask_restful import original_flask_make_response as make_response
18
- except ImportError :
19
- pass
37
+ if TYPE_CHECKING :
38
+ from pydantic .error_wrappers import ErrorDict
39
+
40
+
41
+ ModelResponseReturnValue = Union [ResponseReturnValue , BaseModel ]
42
+ ModelRouteCallable = Union [
43
+ Callable [..., ModelResponseReturnValue ],
44
+ Callable [..., Awaitable [ModelResponseReturnValue ]],
45
+ ]
20
46
21
47
22
48
def make_json_response (
23
49
content : Union [BaseModel , Iterable [BaseModel ]],
24
50
status_code : int ,
25
51
by_alias : bool ,
26
52
exclude_none : bool = False ,
27
- many : bool = False ,
28
53
) -> Response :
29
54
"""serializes model, creates JSON response with given status code"""
30
- if many :
31
- js = f"[{ ', ' .join ([model .json (exclude_none = exclude_none , by_alias = by_alias ) for model in content ])} ]"
55
+ if not isinstance (content , BaseModel ):
56
+ js = "["
57
+ js += ", " .join (
58
+ [
59
+ model .json (exclude_none = exclude_none , by_alias = by_alias )
60
+ for model in content
61
+ ]
62
+ )
63
+ js += "]"
32
64
else :
33
65
js = content .json (exclude_none = exclude_none , by_alias = by_alias )
34
66
response = make_response (js , status_code )
@@ -56,9 +88,9 @@ def validate_many_models(model: Type[BaseModel], content: Any) -> List[BaseModel
56
88
return [model (** fields ) for fields in content ]
57
89
except TypeError :
58
90
# iteration through `content` fails
59
- err = [
91
+ err : List [ "ErrorDict" ] = [
60
92
{
61
- "loc" : [ "root" ] ,
93
+ "loc" : ( "root" ,) ,
62
94
"msg" : "is not an array of objects" ,
63
95
"type" : "type_error.array" ,
64
96
}
@@ -68,30 +100,53 @@ def validate_many_models(model: Type[BaseModel], content: Any) -> List[BaseModel
68
100
raise ManyModelValidationError (ve .errors ())
69
101
70
102
71
- def validate_path_params (func : Callable , kwargs : dict ) -> Tuple [dict , list ]:
72
- errors = []
103
+ def validate_path_params (
104
+ func : ModelRouteCallable , kwargs : Dict [str , Any ]
105
+ ) -> Tuple [Dict [str , Any ], List ["ErrorDict" ]]:
106
+ errors : List ["ErrorDict" ] = []
73
107
validated = {}
74
108
for name , type_ in func .__annotations__ .items ():
75
109
if name in {"query" , "body" , "form" , "return" }:
76
110
continue
77
111
try :
78
112
value = parse_obj_as (type_ , kwargs .get (name ))
79
113
validated [name ] = value
80
- except ValidationError as e :
81
- err = e .errors ()[0 ]
82
- err ["loc" ] = [ name ]
114
+ except ValidationError as error :
115
+ err = error .errors ()[0 ]
116
+ err ["loc" ] = ( name ,)
83
117
errors .append (err )
84
118
kwargs = {** kwargs , ** validated }
85
119
return kwargs , errors
86
120
87
121
88
- def get_body_dict (** params ) :
89
- data = request .get_json (** params )
122
+ def get_body_dict (** params : Dict [ str , Any ]) -> Any :
123
+ data = request .get_json (** params ) # type: ignore
90
124
if data is None and params .get ("silent" ):
91
125
return {}
92
126
return data
93
127
94
128
129
+ def _ensure_model_kwarg (
130
+ kwarg_name : str ,
131
+ from_validate : Optional [Type [BaseModel ]],
132
+ func : ModelRouteCallable ,
133
+ ) -> Tuple [Optional [Type [BaseModel ]], bool ]:
134
+ """Get model information either from wrapped function or validate kwargs."""
135
+ in_func_kwargs = func .__annotations__ .get (kwarg_name )
136
+ if in_func_kwargs is None :
137
+ return from_validate , False
138
+ assert isinstance (in_func_kwargs , type ) and issubclass (
139
+ in_func_kwargs , BaseModel
140
+ ), "Model in function arguments needs to be a BaseModel."
141
+
142
+ # Ensure that the most "detailed" model is used.
143
+ if from_validate is None :
144
+ return in_func_kwargs , True
145
+ if issubclass (in_func_kwargs , from_validate ):
146
+ return in_func_kwargs , True
147
+ return from_validate , True
148
+
149
+
95
150
def validate (
96
151
body : Optional [Type [BaseModel ]] = None ,
97
152
query : Optional [Type [BaseModel ]] = None ,
@@ -100,7 +155,7 @@ def validate(
100
155
response_many : bool = False ,
101
156
request_body_many : bool = False ,
102
157
response_by_alias : bool = False ,
103
- get_json_params : Optional [dict ] = None ,
158
+ get_json_params : Optional [Dict [ str , Any ] ] = None ,
104
159
form : Optional [Type [BaseModel ]] = None ,
105
160
):
106
161
"""
@@ -163,105 +218,93 @@ def test_route_kwargs(query:Query, body:Body, form:Form):
163
218
-> that will render JSON response with serialized MyModel instance
164
219
"""
165
220
166
- def decorate (func : Callable ) -> Callable :
221
+ def decorate (func : ModelRouteCallable ) -> RouteCallable :
167
222
@wraps (func )
168
- def wrapper (* args , ** kwargs ):
169
- q , b , f , err = None , None , None , {}
170
- kwargs , path_err = validate_path_params (func , kwargs )
171
- if path_err :
172
- err ["path_params" ] = path_err
173
- query_in_kwargs = func .__annotations__ .get ("query" )
174
- query_model = query_in_kwargs or query
175
- if query_model :
223
+ def wrapper (* args : Any , ** kwargs : Dict [str , Any ]) -> ResponseReturnValue :
224
+ q , b , f , err = None , None , None , FailedValidation ()
225
+ func_kwargs , path_err = validate_path_params (func , kwargs )
226
+ if len (path_err ) > 0 :
227
+ err .path_params = path_err
228
+ query_model , query_in_kwargs = _ensure_model_kwarg ("query" , query , func )
229
+ if query_model is not None :
176
230
query_params = convert_query_params (request .args , query_model )
177
231
try :
178
232
q = query_model (** query_params )
179
233
except ValidationError as ve :
180
- err ["query_params" ] = ve .errors ()
181
- body_in_kwargs = func .__annotations__ .get ("body" )
182
- body_model = body_in_kwargs or body
183
- if body_model :
234
+ err .query_params = ve .errors ()
235
+ body_model , body_in_kwargs = _ensure_model_kwarg ("body" , body , func )
236
+ if body_model is not None :
184
237
body_params = get_body_dict (** (get_json_params or {}))
185
- if "__root__" in body_model .__fields__ :
186
- try :
187
- b = body_model (__root__ = body_params ).__root__
188
- except ValidationError as ve :
189
- err ["body_params" ] = ve .errors ()
190
- elif request_body_many :
191
- try :
238
+ try :
239
+ if "__root__" in body_model .__fields__ :
240
+ b = body_model (__root__ = body_params ).__root__ # type: ignore
241
+ elif request_body_many :
192
242
b = validate_many_models (body_model , body_params )
193
- except ManyModelValidationError as e :
194
- err ["body_params" ] = e .errors ()
195
- else :
196
- try :
243
+ else :
197
244
b = body_model (** body_params )
198
- except TypeError :
199
- content_type = request .headers .get ("Content-Type" , "" ).lower ()
200
- media_type = content_type .split (";" )[0 ]
201
- if media_type != "application/json" :
202
- return unsupported_media_type_response (content_type )
203
- else :
204
- raise JsonBodyParsingError ()
205
- except ValidationError as ve :
206
- err ["body_params" ] = ve .errors ()
207
- form_in_kwargs = func .__annotations__ .get ("form" )
208
- form_model = form_in_kwargs or form
209
- if form_model :
245
+ except (ValidationError , ManyModelValidationError ) as error :
246
+ err .body_params = error .errors ()
247
+ except TypeError as error :
248
+ content_type = request .headers .get ("Content-Type" , "" ).lower ()
249
+ media_type = content_type .split (";" )[0 ]
250
+ if media_type != "application/json" :
251
+ return unsupported_media_type_response (content_type )
252
+ else :
253
+ raise JsonBodyParsingError () from error
254
+ form_model , form_in_kwargs = _ensure_model_kwarg ("form" , form , func )
255
+ if form_model is not None :
210
256
form_params = request .form
211
- if "__root__" in form_model .__fields__ :
212
- try :
213
- f = form_model (__root__ = form_params ).__root__
214
- except ValidationError as ve :
215
- err ["form_params" ] = ve .errors ()
216
- else :
217
- try :
257
+ try :
258
+ if "__root__" in form_model .__fields__ :
259
+ f = form_model (__root__ = form_params ).__root__ # type: ignore
260
+ else :
218
261
f = form_model (** form_params )
219
- except TypeError :
220
- content_type = request .headers .get ("Content-Type" , "" ).lower ()
221
- media_type = content_type .split (";" )[0 ]
222
- if media_type != "multipart/form-data" :
223
- return unsupported_media_type_response (content_type )
224
- else :
225
- raise JsonBodyParsingError
226
- except ValidationError as ve :
227
- err [ " form_params" ] = ve .errors ()
228
- request .query_params = q
229
- request .body_params = b
230
- request .form_params = f
262
+ except TypeError as error :
263
+ content_type = request .headers .get ("Content-Type" , "" ).lower ()
264
+ media_type = content_type .split (";" )[0 ]
265
+ if media_type != "multipart/form-data" :
266
+ return unsupported_media_type_response (content_type )
267
+ else :
268
+ raise JsonBodyParsingError () from error
269
+ except ValidationError as ve :
270
+ err . form_params = ve .errors ()
271
+ request .query_params = q # type: ignore
272
+ request .body_params = b # type: ignore
273
+ request .form_params = f # type: ignore
231
274
if query_in_kwargs :
232
- kwargs ["query" ] = q
275
+ func_kwargs ["query" ] = q
233
276
if body_in_kwargs :
234
- kwargs ["body" ] = b
277
+ func_kwargs ["body" ] = b
235
278
if form_in_kwargs :
236
- kwargs ["form" ] = f
279
+ func_kwargs ["form" ] = f
237
280
238
- if err :
281
+ if err . check () :
239
282
if current_app .config .get (
240
283
"FLASK_PYDANTIC_VALIDATION_ERROR_RAISE" , False
241
284
):
242
- raise FailedValidation ( ** err )
285
+ raise err
243
286
else :
244
287
status_code = current_app .config .get (
245
288
"FLASK_PYDANTIC_VALIDATION_ERROR_STATUS_CODE" , 400
246
289
)
247
290
return make_response (
248
- jsonify ({"validation_error" : err }),
249
- status_code
291
+ jsonify ({"validation_error" : err .to_dict ()}), status_code
250
292
)
251
- res = func (* args , ** kwargs )
293
+ res : ModelResponseReturnValue = current_app .ensure_sync (func )(
294
+ * args , ** func_kwargs
295
+ )
252
296
253
297
if response_many :
254
- if is_iterable_of_models (res ):
255
- return make_json_response (
256
- res ,
257
- on_success_status ,
258
- by_alias = response_by_alias ,
259
- exclude_none = exclude_none ,
260
- many = True ,
261
- )
262
- else :
298
+ if not is_iterable_of_models (res ):
263
299
raise InvalidIterableOfModelsException (res )
264
300
301
+ return make_json_response (
302
+ res , # type: ignore # Iterability and type is ensured above.
303
+ on_success_status ,
304
+ by_alias = response_by_alias ,
305
+ exclude_none = exclude_none ,
306
+ )
307
+
265
308
if isinstance (res , BaseModel ):
266
309
return make_json_response (
267
310
res ,
@@ -275,23 +318,29 @@ def wrapper(*args, **kwargs):
275
318
and len (res ) in [2 , 3 ]
276
319
and isinstance (res [0 ], BaseModel )
277
320
):
278
- headers = None
321
+ headers : Optional [
322
+ Union [Dict [str , Any ], Tuple [Any , ...], List [Any ]]
323
+ ] = None
279
324
status = on_success_status
280
325
if isinstance (res [1 ], (dict , tuple , list )):
281
326
headers = res [1 ]
282
- elif len (res ) == 3 and isinstance (res [2 ], (dict , tuple , list )):
283
- status = res [1 ]
284
- headers = res [2 ]
285
- else :
327
+ elif isinstance (res [1 ], int ):
286
328
status = res [1 ]
287
329
330
+ # Following type ignores should be fixed once
331
+ # https://github.com/python/mypy/issues/1178 is fixed.
332
+ if len (res ) == 3 and isinstance (
333
+ res [2 ], (dict , tuple , list ) # type: ignore[misc]
334
+ ):
335
+ headers = res [2 ] # type: ignore[misc]
336
+
288
337
ret = make_json_response (
289
338
res [0 ],
290
339
status ,
291
340
exclude_none = exclude_none ,
292
341
by_alias = response_by_alias ,
293
342
)
294
- if headers :
343
+ if headers is not None :
295
344
ret .headers .update (headers )
296
345
return ret
297
346
0 commit comments