diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index dbde13e6..5d166f15 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -201,7 +201,7 @@ async def _init(self, create_triggers: bool): ) async def get_records(self, filter_dict={}, fetch_single=False, - ordering: List[str] = None, limit: int = 0, expanded=False, + ordering: List[str] = None, limit: int = 0, offset: int = 0, expanded=False, cur: aiopg.Cursor = None) -> DBResponse: conditions = [] values = [] @@ -211,7 +211,7 @@ async def get_records(self, filter_dict={}, fetch_single=False, response, _ = await self.find_records( conditions=conditions, values=values, fetch_single=fetch_single, - order=ordering, limit=limit, expanded=expanded, cur=cur + order=ordering, limit=limit, offset=offset, expanded=expanded, cur=cur ) return response @@ -572,8 +572,8 @@ async def get_flow(self, flow_id: str): filter_dict = {"flow_id": flow_id} return await self.get_records(filter_dict=filter_dict, fetch_single=True) - async def get_all_flows(self): - return await self.get_records() + async def get_all_flows(self, limit:int = 0, offset: int = 0): + return await self.get_records(limit=limit, offset=offset) class AsyncRunTablePostgres(AsyncPostgresTable): @@ -606,9 +606,9 @@ async def get_run(self, flow_id: str, run_id: str, expanded: bool = False, cur: return await self.get_records(filter_dict=filter_dict, fetch_single=True, expanded=expanded, cur=cur) - async def get_all_runs(self, flow_id: str): + async def get_all_runs(self, flow_id: str, limit:int=0, offset:int=0): filter_dict = {"flow_id": flow_id} - return await self.get_records(filter_dict=filter_dict) + return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset) async def update_heartbeat(self, flow_id: str, run_id: str): run_key, run_value = translate_run_key(run_id) @@ -661,11 +661,11 @@ async def add_step(self, step_object: StepRow): } return await self.create_record(dict) - async def get_steps(self, flow_id: str, run_id: str): + async def get_steps(self, flow_id: str, run_id: str, limit:int=0, offset:int=0): run_id_key, run_id_value = translate_run_key(run_id) filter_dict = {"flow_id": flow_id, run_id_key: run_id_value} - return await self.get_records(filter_dict=filter_dict) + return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset) async def get_step(self, flow_id: str, run_id: str, step_name: str): run_id_key, run_id_value = translate_run_key(run_id) @@ -705,14 +705,14 @@ async def add_task(self, task: TaskRow, fill_heartbeat=False): } return await self.create_record(dict) - async def get_tasks(self, flow_id: str, run_id: str, step_name: str): + async def get_tasks(self, flow_id: str, run_id: str, step_name: str, limit:int=0, offset:int=0): run_id_key, run_id_value = translate_run_key(run_id) filter_dict = { "flow_id": flow_id, run_id_key: run_id_value, "step_name": step_name, } - return await self.get_records(filter_dict=filter_dict) + return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset) async def get_task(self, flow_id: str, run_id: str, step_name: str, task_id: str, expanded: bool = False): @@ -795,14 +795,14 @@ async def add_metadata( } return await self.create_record(dict) - async def get_metadata_in_runs(self, flow_id: str, run_id: str): + async def get_metadata_in_runs(self, flow_id: str, run_id: str, limit:int=0, offset:int=0): run_id_key, run_id_value = translate_run_key(run_id) filter_dict = {"flow_id": flow_id, run_id_key: run_id_value} - return await self.get_records(filter_dict=filter_dict) + return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset) async def get_metadata( - self, flow_id: str, run_id: int, step_name: str, task_id: str + self, flow_id: str, run_id: int, step_name: str, task_id: str, limit: int = 0, offset: int = 0 ): run_id_key, run_id_value = translate_run_key(run_id) task_id_key, task_id_value = translate_task_key(task_id) @@ -812,9 +812,9 @@ async def get_metadata( "step_name": step_name, task_id_key: task_id_value, } - return await self.get_records(filter_dict=filter_dict) + return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset) - async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name: str, field_name: str, pattern: str): + async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name: str, field_name: str, pattern: str, limit:int = 0, offset: int = 0): """ Returns a list of task pathspecs that match the given field_name and regexp pattern for the value """ @@ -844,6 +844,8 @@ async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name ) T {where} {order_by} + {limit} + {offset} """ select_sql = sql_template.format( @@ -851,7 +853,9 @@ async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name table_name=self.table_name, where="WHERE {}".format(" AND ".join(conditions)), order_by="ORDER BY task_id", - select_columns=",".join(["flow_id, run_number, run_id, step_name, task_name, task_id"]) + select_columns=",".join(["flow_id, run_number, run_id, step_name, task_name, task_id"]), + limit="LIMIT {}".format(limit) if limit else "", + offset= "OFFSET {}".format(offset) if offset else "" ).strip() db_response, pagination = await self.execute_sql(select_sql=select_sql, values=values, serialize=False) @@ -922,16 +926,16 @@ async def add_artifact( } return await self.create_record(dict) - async def get_artifacts_in_runs(self, flow_id: str, run_id: int): + async def get_artifacts_in_runs(self, flow_id: str, run_id: int, limit:int=0, offset:int=0): run_id_key, run_id_value = translate_run_key(run_id) filter_dict = { "flow_id": flow_id, run_id_key: run_id_value, } return await self.get_records(filter_dict=filter_dict, - ordering=self.ordering) + ordering=self.ordering, limit=limit,offset=offset) - async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str): + async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str, limit:int=0, offset:int=0): run_id_key, run_id_value = translate_run_key(run_id) filter_dict = { "flow_id": flow_id, @@ -939,10 +943,10 @@ async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str) "step_name": step_name, } return await self.get_records(filter_dict=filter_dict, - ordering=self.ordering) + ordering=self.ordering, limit=limit,offset=offset) async def get_artifact_in_task( - self, flow_id: str, run_id: int, step_name: str, task_id: int + self, flow_id: str, run_id: int, step_name: str, task_id: int, limit:int=0, offset:int=0 ): run_id_key, run_id_value = translate_run_key(run_id) task_id_key, task_id_value = translate_task_key(task_id) @@ -953,7 +957,7 @@ async def get_artifact_in_task( task_id_key: task_id_value, } return await self.get_records(filter_dict=filter_dict, - ordering=self.ordering) + ordering=self.ordering, limit=limit,offset=offset) async def get_artifact( self, flow_id: str, run_id: int, step_name: str, task_id: int, name: str diff --git a/services/metadata_service/api/artifact.py b/services/metadata_service/api/artifact.py index 40d60a39..a8176943 100644 --- a/services/metadata_service/api/artifact.py +++ b/services/metadata_service/api/artifact.py @@ -10,6 +10,7 @@ format_response, handle_exceptions, http_500, + parse_pagination_params ) import json @@ -216,9 +217,10 @@ async def get_artifacts_by_task(self, request): run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") + limit, offset = parse_pagination_params(request) db_response = await self._async_table.get_artifact_in_task( - flow_id, run_number, step_name, task_id + flow_id, run_number, step_name, task_id, limit, offset ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) @@ -277,9 +279,10 @@ async def get_artifacts_by_task_attempt(self, request): step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") attempt_id = request.match_info.get("attempt_id") + limit, offset = parse_pagination_params(request) db_response = await self._async_table.get_artifact_in_task( - flow_id, run_number, step_name, task_id + flow_id, run_number, step_name, task_id, limit, offset ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) @@ -333,9 +336,10 @@ async def get_artifacts_by_step(self, request): flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") + limit, offset = parse_pagination_params(request) db_response = await self._async_table.get_artifact_in_steps( - flow_id, run_number, step_name + flow_id, run_number, step_name, limit, offset ) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) @@ -376,8 +380,9 @@ async def get_artifacts_by_run(self, request): """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") + limit, offset = parse_pagination_params(request) - db_response = await self._async_table.get_artifacts_in_runs(flow_id, run_number) + db_response = await self._async_table.get_artifacts_in_runs(flow_id, run_number, limit, offset) if db_response.response_code == 200: db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) filtered_body = filter_artifacts_for_latest_attempt(db_response.body) diff --git a/services/metadata_service/api/flow.py b/services/metadata_service/api/flow.py index 633a1f65..f68cea6a 100644 --- a/services/metadata_service/api/flow.py +++ b/services/metadata_service/api/flow.py @@ -2,7 +2,7 @@ from services.data.postgres_async_db import AsyncPostgresDB from services.utils import read_body from services.metadata_service.api.utils import format_response, \ - handle_exceptions + handle_exceptions, parse_pagination_params import asyncio @@ -77,6 +77,16 @@ async def get_flow(self, request): description: "flow_id" required: true type: "string" + - name: "_limit" + in: "query" + description: "Limit for the number of results" + required: false + type: "integer" + - name: "_page" + in: "query" + description: "Page of results to return" + required: false + type: "integer" produces: - text/plain responses: @@ -107,4 +117,6 @@ async def get_all_flows(self, request): "405": description: invalid HTTP Method """ - return await self._async_table.get_all_flows() + limit, offset = parse_pagination_params(request) + + return await self._async_table.get_all_flows(limit=limit, offset=offset) diff --git a/services/metadata_service/api/metadata.py b/services/metadata_service/api/metadata.py index fb7fa6be..feddae45 100644 --- a/services/metadata_service/api/metadata.py +++ b/services/metadata_service/api/metadata.py @@ -2,7 +2,7 @@ import json from services.utils import read_body from services.metadata_service.api.utils import format_response, \ - handle_exceptions + handle_exceptions, parse_pagination_params import asyncio from services.data.postgres_async_db import AsyncPostgresDB @@ -73,8 +73,10 @@ async def get_metadata(self, request): run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") task_id = request.match_info.get("task_id") + limit, offset = parse_pagination_params(request) + return await self._async_table.get_metadata( - flow_name, run_number, step_name, task_id + flow_name, run_number, step_name, task_id, limit, offset ) @format_response @@ -106,8 +108,10 @@ async def get_metadata_by_run(self, request): """ flow_name = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") + limit, offset = parse_pagination_params(request) + return await self._async_table.get_metadata_in_runs( - flow_name, run_number + flow_name, run_number, limit, offset ) async def create_metadata(self, request): diff --git a/services/metadata_service/api/run.py b/services/metadata_service/api/run.py index 3f26eed9..c0889614 100644 --- a/services/metadata_service/api/run.py +++ b/services/metadata_service/api/run.py @@ -5,7 +5,7 @@ from services.data.models import RunRow from services.utils import has_heartbeat_capable_version_tag, read_body from services.metadata_service.api.utils import format_response, \ - handle_exceptions + handle_exceptions, parse_pagination_params from services.data.postgres_async_db import AsyncPostgresDB @@ -82,7 +82,9 @@ async def get_all_runs(self, request): description: invalid HTTP Method """ flow_name = request.match_info.get("flow_id") - return await self._async_table.get_all_runs(flow_name) + limit, offset = parse_pagination_params(request) + + return await self._async_table.get_all_runs(flow_id=flow_name, limit=limit, offset=offset) @format_response @handle_exceptions diff --git a/services/metadata_service/api/step.py b/services/metadata_service/api/step.py index 5f3b7d0e..ce22207a 100644 --- a/services/metadata_service/api/step.py +++ b/services/metadata_service/api/step.py @@ -2,7 +2,7 @@ from services.data.tagging_utils import apply_run_tags_to_db_response from services.utils import read_body from services.metadata_service.api.utils import format_response, \ - handle_exceptions + handle_exceptions, parse_pagination_params from services.data.postgres_async_db import AsyncPostgresDB @@ -55,7 +55,9 @@ async def get_steps(self, request): """ flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") - db_response = await self._async_table.get_steps(flow_id, run_number) + limit, offset = parse_pagination_params(request) + + db_response = await self._async_table.get_steps(flow_id=flow_id, run_id=run_number, limit=limit, offset=offset) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response diff --git a/services/metadata_service/api/task.py b/services/metadata_service/api/task.py index 40f42706..c62ab9e7 100644 --- a/services/metadata_service/api/task.py +++ b/services/metadata_service/api/task.py @@ -3,7 +3,7 @@ from services.data.tagging_utils import apply_run_tags_to_db_response from services.utils import has_heartbeat_capable_version_tag, read_body from services.metadata_service.api.utils import format_response, \ - handle_exceptions + handle_exceptions, parse_pagination_params import json from aiohttp import web import asyncio @@ -77,8 +77,9 @@ async def get_tasks(self, request): flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") + limit, offset = parse_pagination_params(request) - db_response = await self._async_table.get_tasks(flow_id, run_number, step_name) + db_response = await self._async_table.get_tasks(flow_id, run_number, step_name, limit, offset) db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response @@ -125,12 +126,13 @@ async def get_filtered_tasks(self, request): flow_id = request.match_info.get("flow_id") run_number = request.match_info.get("run_number") step_name = request.match_info.get("step_name") + limit, offset = parse_pagination_params(request) # possible filters metadata_field = request.query.get("metadata_field_name", None) pattern = request.query.get("pattern", None) - db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern) + db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern, limit, offset) return db_response @format_response diff --git a/services/metadata_service/api/utils.py b/services/metadata_service/api/utils.py index 8e916436..eda11203 100644 --- a/services/metadata_service/api/utils.py +++ b/services/metadata_service/api/utils.py @@ -65,3 +65,11 @@ async def wrapper(*args, **kwargs): return http_500(str(err)) return wrapper + + +def parse_pagination_params(request): + page = int(request.query.get("_page", 1)) + limit = int(request.query.get("_limit", 0)) + + offset = limit * (page - 1) + return limit, offset