diff --git a/app/__init__.py b/app/__init__.py index ecf0bb9a..da083678 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,9 +1,9 @@ import flask -from app.server import routes, json_response +from app.server import routes def create_app() -> flask.Flask: app = flask.Flask(__name__) - app.response_class = json_response.JsonResponse app.register_blueprint(routes.routes) + app.config["RESTX_MASK_SWAGGER"] = False # disable X-Fields header in swagger return app diff --git a/app/db/model.py b/app/db/model.py index 750f7004..9a436b02 100644 --- a/app/db/model.py +++ b/app/db/model.py @@ -9,6 +9,9 @@ from sqlalchemy_repr import RepresentableBase from app.db import DBSession +from flask_restx import fields +from typing import NamedTuple, Dict + Base = declarative_base(cls=RepresentableBase) # sqlalchemy magic base class. @@ -74,6 +77,24 @@ def from_string(cls, name: str): raise NotImplementedError(f"Unknown ImportStatus enum {name}") +# Raw is the flask-restx base class for "a json-serializable field". +ModelDefinition = Dict[str, Type[fields.Raw]] + + +# Note: this should really be a namedtuple but for https://github.com/noirbizarre/flask-restplus/issues/364 +# This is an easy fix in flask-restx if we decide to go this route. +class ImportStatusResponse: + def __init__(self, id: str, status: str): + self.id = id + self.status = status + + @classmethod + def get_model(cls) -> ModelDefinition: + return { + "id": fields.String, + "status": fields.String } + + # This is mypy shenanigans so functions inside the Import class can return an instance of type Import. # It's basically a forward declaration of the type. ImportT = TypeVar('ImportT', bound='Import') @@ -101,7 +122,6 @@ def truncate(self, key, value): return value[:max_len] return value - def __init__(self, workspace_name: str, workspace_ns: str, workspace_uuid: str, submitter: str, import_url: str, filetype: str): self.id = str(uuid.uuid4()) self.workspace_name = workspace_name @@ -134,3 +154,6 @@ def update_status_exclusively(cls, id: str, current_status: ImportStatus, new_st def write_error(self, msg: str) -> None: self.error_message = msg self.status = ImportStatus.Error + + def to_status_response(self) -> ImportStatusResponse: + return ImportStatusResponse(self.id, self.status.name) diff --git a/app/health.py b/app/health.py index d5ecbf77..4698a6ec 100644 --- a/app/health.py +++ b/app/health.py @@ -1,29 +1,40 @@ -import flask -import json -import logging -from sqlalchemy.orm.exc import NoResultFound -from typing import Dict +from flask_restx import fields -from app.auth import user_auth from app.db import db, model -from app.db.model import ImportStatus from app.external import sam, rawls -from app.util import exceptions -def handle_health_check() -> flask.Response: - +class HealthResponse: + def __init__(self, db_health: bool, rawls_health: bool, sam_health: bool): + self.ok = all([db_health, rawls_health, sam_health]) + self.subsystems = { + "db": db_health, + "rawls": rawls_health, + "sam": sam_health + } + + @classmethod + def get_model(cls, api) -> model.ModelDefinition: + return { + "ok": fields.Boolean, + "subsystems": fields.Nested(api.model('SubsystemModel', { + "db": fields.Boolean, + "rawls": fields.Boolean, + "sam": fields.Boolean + })) + } + + +def handle_health_check() -> HealthResponse: sam_health = sam.check_health() rawls_health = rawls.check_health() db_health = check_health() - isvc_health = all([sam_health, rawls_health, db_health]) - - return flask.make_response((json.dumps({"ok": isvc_health, "subsystems": {"db": db_health, "rawls": rawls_health, "sam": sam_health}}), 200)) + return HealthResponse(db_health, rawls_health, sam_health) def check_health() -> bool: with db.session_ctx() as sess: res = sess.execute("select true").rowcount - return bool(res) \ No newline at end of file + return bool(res) diff --git a/app/new_import.py b/app/new_import.py index e72b56c6..9de7da9d 100644 --- a/app/new_import.py +++ b/app/new_import.py @@ -1,33 +1,12 @@ import flask -import jsonschema -import logging from app import translate -from app.util import exceptions from app.db import db, model from app.external import sam, pubsub from app.auth import user_auth -NEW_IMPORT_SCHEMA = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "path": { - "type": "string" - }, - "filetype": { - "type": "string", - "enum": list(translate.FILETYPE_TRANSLATORS.keys()) - } - }, - "required": ["path", "filetype"] -} - -schema_validator = jsonschema.Draft7Validator(NEW_IMPORT_SCHEMA) - - -def handle(request: flask.Request, ws_ns: str, ws_name: str) -> flask.Response: +def handle(request: flask.Request, ws_ns: str, ws_name: str) -> model.ImportStatusResponse: access_token = user_auth.extract_auth_token(request) user_info = sam.validate_user(access_token) @@ -37,12 +16,6 @@ def handle(request: flask.Request, ws_ns: str, ws_name: str) -> flask.Response: # make sure the user is allowed to import to this workspace workspace_uuid = user_auth.workspace_uuid_with_auth(ws_ns, ws_name, access_token, "write") - try: # now validate that the input is correctly shaped - schema_validator.validate(request_json) - except jsonschema.ValidationError as ve: - logging.info("Got malformed JSON.") - raise exceptions.BadJsonException(ve.message) - import_url = request_json["path"] # and validate the input's path @@ -62,4 +35,4 @@ def handle(request: flask.Request, ws_ns: str, ws_name: str) -> flask.Response: pubsub.publish_self({"action": "translate", "import_id": new_import_id}) - return flask.make_response((str(new_import_id), 201)) + return new_import.to_status_response() diff --git a/app/server/json_response.py b/app/server/json_response.py deleted file mode 100644 index b1b036b3..00000000 --- a/app/server/json_response.py +++ /dev/null @@ -1,5 +0,0 @@ -from flask import Response - - -class JsonResponse(Response): - default_mimetype = "application/json" diff --git a/app/server/routes.py b/app/server/routes.py index e8a11caa..54de5595 100644 --- a/app/server/routes.py +++ b/app/server/routes.py @@ -1,44 +1,82 @@ import flask +from flask_restx import Api, Resource, fields import json import humps -from typing import Dict, Callable +from typing import Dict, Callable, Any from app import new_import, translate, status, health +from app.db import model import app.auth.service_auth from app.server.requestutils import httpify_excs, pubsubify_excs routes = flask.Blueprint('import-service', __name__, '/') +authorizations = { + 'Bearer': { + "type": "apiKey", + "name": "Authorization", + "in": "header", + "description": "Use your GCP auth token, i.e. `gcloud auth print-access-token`. Required scopes are [openid, email, profile]. Write `Bearer ` in the box." + } +} -@routes.route('///imports', methods=["POST"]) -@httpify_excs -def create_import(ws_ns, ws_name) -> flask.Response: - """Accept an import request""" - return new_import.handle(flask.request, ws_ns, ws_name) - - -@routes.route('///imports/', methods=["GET"]) -@httpify_excs -def import_status(ws_ns, ws_name, import_id) -> flask.Response: - """Return the status of an import job""" - return status.handle_get_import_status(flask.request, ws_ns, ws_name, import_id) - - -@routes.route('///imports', methods=["GET"]) -@httpify_excs -def import_status_workspace(ws_ns, ws_name) -> flask.Response: - """Return the status of import jobs in a workspace""" - return status.handle_list_import_status(flask.request, ws_ns, ws_name) - - -@routes.route('/health', methods=["GET"]) -@httpify_excs -def health_check() -> flask.Response: - return health.handle_health_check() +api = Api(routes, version='1.0', title='Import Service', + description='import service', + authorizations=authorizations, + security=[{"Bearer": "[]"}]) + +ns = api.namespace('/', description='import handling') + + +new_import_model = ns.model("NewImport", + {"path": fields.String(required=True), + "filetype": fields.String(enum=list(translate.FILETYPE_TRANSLATORS.keys()), required=True)}) +import_status_response_model = ns.model("ImportStatusResponse", model.ImportStatusResponse.get_model()) +health_response_model = ns.model("HealthResponse", health.HealthResponse.get_model(api)) + + +@ns.route('///imports/') +@ns.param('workspace_project', 'Workspace project') +@ns.param('workspace_name', 'Workspace name') +@ns.param('import_id', 'Import id') +class SpecificImport(Resource): + @httpify_excs + @ns.marshal_with(import_status_response_model) + def get(self, workspace_project, workspace_name, import_id): + """Return status for this import.""" + return status.handle_get_import_status(flask.request, workspace_project, workspace_name, import_id) + + +@ns.route('///imports') +@ns.param('workspace_project', 'Workspace project') +@ns.param('workspace_name', 'Workspace name') +class Imports(Resource): + @httpify_excs + @ns.expect(new_import_model, validate=True) + @ns.marshal_with(import_status_response_model, code=201) + def post(self, workspace_project, workspace_name): + """Accept an import request.""" + return new_import.handle(flask.request, workspace_project, workspace_name), 201 + + @httpify_excs + @ns.marshal_with(import_status_response_model, code=200, as_list=True) + def get(self, workspace_project, workspace_name): + """Return all imports in the workspace.""" + return status.handle_list_import_status(flask.request, workspace_project, workspace_name) + + +@ns.route('/health') +class Health(Resource): + @httpify_excs + @api.doc(security=None) + @ns.marshal_with(health_response_model, code=200) + def get(self): + """Return whether we and all dependent subsystems are healthy.""" + return health.handle_health_check(), 200 # Dispatcher for pubsub messages. -pubsub_dispatch: Dict[str, Callable[[Dict[str, str]], flask.Response]] = { +pubsub_dispatch: Dict[str, Callable[[Dict[str, str]], Any]] = { "translate": translate.handle, "status": status.external_update_status } @@ -46,13 +84,15 @@ def health_check() -> flask.Response: # This particular URL, though weird, can be secured using GCP magic. # See https://cloud.google.com/pubsub/docs/push#authenticating_standard_and_urls -@routes.route('/_ah/push-handlers/receive_messages', methods=['POST']) -@pubsubify_excs -def pubsub_receive() -> flask.Response: - app.auth.service_auth.verify_pubsub_jwt(flask.request) - - envelope = json.loads(flask.request.data.decode('utf-8')) - attributes = envelope['message']['attributes'] - - # humps.decamelize turns camelCase to snake_case in dict keys - return pubsub_dispatch[attributes["action"]](humps.decamelize(attributes)) +@ns.route('/_ah/push-handlers/receive_messages', doc=False) +class PubSub(Resource): + @pubsubify_excs + @ns.marshal_with(import_status_response_model, code=200) + def post(self) -> flask.Response: + app.auth.service_auth.verify_pubsub_jwt(flask.request) + + envelope = json.loads(flask.request.data.decode('utf-8')) + attributes = envelope['message']['attributes'] + + # humps.decamelize turns camelCase to snake_case in dict keys + return pubsub_dispatch[attributes["action"]](humps.decamelize(attributes)) diff --git a/app/status.py b/app/status.py index c979a0b8..185222f6 100644 --- a/app/status.py +++ b/app/status.py @@ -2,7 +2,7 @@ import json import logging from sqlalchemy.orm.exc import NoResultFound -from typing import Dict +from typing import Dict, List from app.auth import user_auth from app.db import db, model @@ -11,7 +11,7 @@ from app.util import exceptions -def handle_get_import_status(request: flask.Request, ws_ns: str, ws_name: str, import_id: str) -> flask.Response: +def handle_get_import_status(request: flask.Request, ws_ns: str, ws_name: str, import_id: str) -> model.ImportStatusResponse: access_token = user_auth.extract_auth_token(request) sam.validate_user(access_token) @@ -24,12 +24,12 @@ def handle_get_import_status(request: flask.Request, ws_ns: str, ws_name: str, i filter(model.Import.workspace_namespace == ws_ns).\ filter(model.Import.workspace_name == ws_name).\ filter(model.Import.id == import_id).one() - return flask.make_response((json.dumps({"id": imprt.id, "status": imprt.status.name}), 200)) + return imprt.to_status_response() except NoResultFound: raise exceptions.NotFoundException(message=f"Import {import_id} not found") -def handle_list_import_status(request: flask.Request, ws_ns: str, ws_name: str) -> flask.Response: +def handle_list_import_status(request: flask.Request, ws_ns: str, ws_name: str) -> List[model.ImportStatusResponse]: running_only = "running_only" in request.args access_token = user_auth.extract_auth_token(request) @@ -44,12 +44,12 @@ def handle_list_import_status(request: flask.Request, ws_ns: str, ws_name: str) filter(model.Import.workspace_name == ws_name) q = q.filter(model.Import.status.in_(ImportStatus.running_statuses())) if running_only else q import_list = q.order_by(model.Import.submit_time.desc()).all() - import_statuses = [{"id": imprt.id, "status": imprt.status.name} for imprt in import_list] + import_statuses = [imprt.to_status_response() for imprt in import_list] - return flask.make_response((json.dumps(import_statuses), 200)) + return import_statuses -def external_update_status(msg: Dict[str, str]) -> flask.Response: +def external_update_status(msg: Dict[str, str]) -> model.ImportStatusResponse: """A trusted external service has told us to update the status for this import. Change the status, but sanely. It's possible that pub/sub might deliver this message more than once, so we need to account for that too.""" @@ -84,4 +84,5 @@ def external_update_status(msg: Dict[str, str]) -> flask.Response: if not update_successful: logging.warning(f"Failed to update status for import {import_id}: expected {current_status}, got {imp.status}.") - return flask.make_response("ok") + # This goes back to Pub/Sub, nobody reads it + return model.ImportStatusResponse(import_id, new_status.name) diff --git a/app/tests/test_health.py b/app/tests/test_health.py new file mode 100644 index 00000000..3287c3a8 --- /dev/null +++ b/app/tests/test_health.py @@ -0,0 +1,59 @@ +import flask.testing +import jsonschema +import pytest +from unittest import mock + +from app import new_import, translate +from app.util import exceptions +from app.db import db +from app.db.model import * + + +@pytest.fixture(scope="function") +def sam_ok(monkeypatch): + """Makes us think that Sam is fine.""" + monkeypatch.setattr("app.external.sam.check_health", + mock.MagicMock(return_value=True)) + + +@pytest.fixture(scope="function") +def rawls_ok(monkeypatch): + """Makes us think that Rawls is fine.""" + monkeypatch.setattr("app.external.rawls.check_health", + mock.MagicMock(return_value=True)) + + +@pytest.fixture(scope="function") +def db_ok(monkeypatch): + """Makes us think that our db is fine.""" + monkeypatch.setattr("app.health.check_health", + mock.MagicMock(return_value=True)) + + +@pytest.fixture(scope="function") +def rawls_bad(monkeypatch): + """Makes us think that Rawls is dead.""" + monkeypatch.setattr("app.external.rawls.check_health", + mock.MagicMock(return_value=False)) + + +@pytest.mark.usefixtures("sam_ok", "rawls_ok", "db_ok") +def test_everything_ok(client): + resp = client.get('/health') + assert resp.status_code == 200 + + assert resp.json["ok"] + assert resp.json["subsystems"]["rawls"] + assert resp.json["subsystems"]["sam"] + assert resp.json["subsystems"]["db"] + + +@pytest.mark.usefixtures("sam_ok", "rawls_bad", "db_ok") +def test_one_subsystem_died(client): + resp = client.get('/health') + assert resp.status_code == 200 + + assert not resp.json["ok"] + assert not resp.json["subsystems"]["rawls"] + assert resp.json["subsystems"]["sam"] + assert resp.json["subsystems"]["db"] diff --git a/app/tests/test_new_import.py b/app/tests/test_new_import.py index ee859b0d..f802d07b 100644 --- a/app/tests/test_new_import.py +++ b/app/tests/test_new_import.py @@ -9,12 +9,8 @@ from app.db.model import * -def test_schema_valid(): - jsonschema.Draft7Validator.check_schema(new_import.NEW_IMPORT_SCHEMA) - - good_json = {"path": f"https://{translate.VALID_NETLOCS[0]}/some/path", "filetype": "pfb"} -good_headers = {"Authorization": "Bearer ya29.blahblah"} +good_headers = {"Authorization": "Bearer ya29.blahblah", "Accept": "application/json"} @pytest.mark.usefixtures("sam_valid_user", "user_has_ws_access", "pubsub_publish", "pubsub_fake_env") @@ -24,9 +20,10 @@ def test_golden_path(client): # response contains the job ID, check it's actually in the database sess = db.get_session() - dbres = sess.query(Import).filter(Import.id == resp.get_data(as_text=True)).all() + id = resp.json["id"] + dbres = sess.query(Import).filter(Import.id == id).all() assert len(dbres) == 1 - assert dbres[0].id == str(resp.get_data(as_text=True)) + assert dbres[0].id == id assert resp.headers["Content-Type"] == "application/json" diff --git a/app/tests/test_status.py b/app/tests/test_status.py index dcf52e4c..e8f2fdc4 100644 --- a/app/tests/test_status.py +++ b/app/tests/test_status.py @@ -15,11 +15,11 @@ def test_get_import_status(client): new_import_resp = client.post('/namespace/name/imports', json=good_json, headers=good_headers) assert new_import_resp.status_code == 201 - import_id = new_import_resp.get_data(as_text=True) + import_id = new_import_resp.json["id"] - resp = client.get('/namespace/name/imports/{}'.format(import_id), headers=good_headers) + resp = client.get(f'/namespace/name/imports/{import_id}', headers=good_headers) assert resp.status_code == 200 - assert resp.get_json(force=True) == {'id': import_id, 'status': ImportStatus.Pending.name} + assert resp.json == {'id': import_id, 'status': ImportStatus.Pending.name} @pytest.mark.usefixtures("sam_valid_user", "user_has_ws_access", "pubsub_publish", "pubsub_fake_env") @@ -32,11 +32,11 @@ def test_get_import_status_404(client): @pytest.mark.usefixtures("sam_valid_user", "user_has_ws_access", "pubsub_publish", "pubsub_fake_env") def test_get_all_import_status(client): - import_id = client.post('/namespace/name/imports', json=good_json, headers=good_headers).get_data(as_text=True) + import_id = client.post('/namespace/name/imports', json=good_json, headers=good_headers).json["id"] resp = client.get('/namespace/name/imports', headers=good_headers) assert resp.status_code == 200 - assert resp.get_json(force=True) == [{"id": import_id, "status": ImportStatus.Pending.name}] + assert resp.json == [{"id": import_id, "status": ImportStatus.Pending.name}] @pytest.mark.usefixtures("sam_valid_user", "user_has_ws_access", "pubsub_publish", "pubsub_fake_env") @@ -52,16 +52,16 @@ def test_get_all_running_when_none(client): resp = client.get('/namespace/name/imports?running_only', headers=good_headers) assert resp.status_code == 200 - assert resp.get_json(force=True) == [] + assert resp.json == [] @pytest.mark.usefixtures("sam_valid_user", "user_has_ws_access", "pubsub_publish", "pubsub_fake_env") def test_get_all_running_with_one(client): - import_id = client.post('/namespace/name/imports', json=good_json, headers=good_headers).get_data(as_text=True) + import_id = client.post('/namespace/name/imports', json=good_json, headers=good_headers).json["id"] resp = client.get('/namespace/name/imports?running_only', headers=good_headers) assert resp.status_code == 200 - assert resp.get_json(force=True) == [{"id": import_id, "status": ImportStatus.Pending.name}] + assert resp.json == [{"id": import_id, "status": ImportStatus.Pending.name}] @pytest.mark.usefixtures("incoming_valid_pubsub") diff --git a/app/translate.py b/app/translate.py index 9a70612e..4a338757 100644 --- a/app/translate.py +++ b/app/translate.py @@ -26,7 +26,7 @@ VALID_NETLOCS = ["gen3-pfb-export.s3.amazonaws.com", "storage.googleapis.com"] -def handle(msg: Dict[str, str]) -> flask.Response: +def handle(msg: Dict[str, str]) -> ImportStatusResponse: import_id = msg["import_id"] with db.session_ctx() as sess: # flip the status to Translating, and then get the row @@ -37,7 +37,7 @@ def handle(msg: Dict[str, str]) -> flask.Response: # this import wasn't in pending. most likely this means that the pubsub message we got was delivered twice, # and some other GAE instance has picked it up and is happily processing it. happy translating, friendo! logging.info(f"Failed to update status exclusively for translating import {import_id}: expected Pending, got {import_details.status}. PubSub probably delivered this message twice.") - return flask.make_response("ok") + return flask.make_response(import_details.to_status_response()) dest_file = f'{os.environ.get("BATCH_UPSERT_BUCKET")}/{import_details.id}.rawlsUpsert' @@ -76,7 +76,7 @@ def handle(msg: Dict[str, str]) -> flask.Response: "upsertFile": dest_file }) - return flask.make_response("ok") + return ImportStatusResponse(import_id, ImportStatus.ReadyForUpsert.name) def _stream_translate(source: IO, dest: IO, translator: Translator) -> None: diff --git a/mypy.ini b/mypy.ini index 57b11d51..973308df 100644 --- a/mypy.ini +++ b/mypy.ini @@ -37,3 +37,6 @@ ignore_missing_imports = True [mypy-humps] ignore_missing_imports = True + +[mypy-flask_restx] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index af1707d7..8269db7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,4 @@ gcsfs==0.6.0 memunit==0.5.0 psutil==5.6.7 pyhumps==1.3.1 +flask-restx==0.1.1