diff --git a/planemo/commands/cmd_list_invocations.py b/planemo/commands/cmd_list_invocations.py index 46b8528e8..f41b61d57 100644 --- a/planemo/commands/cmd_list_invocations.py +++ b/planemo/commands/cmd_list_invocations.py @@ -7,7 +7,10 @@ from planemo import options from planemo.cli import command_function from planemo.galaxy import profiles -from planemo.galaxy.api import get_invocations +from planemo.galaxy.api import ( + get_invocations, + gi, +) from planemo.galaxy.workflows import remote_runnable_to_workflow_id from planemo.io import ( error, @@ -25,24 +28,53 @@ @click.argument( "workflow_identifier", type=click.STRING, + required=False, + default="", +) +@click.option( + "--raw", + is_flag=True, + help="output will be a json structure.", + default=False, +) +@click.option( + "--max_items", + type=click.INT, + help="max number of items returned.", + default=100, +) +@click.option( + "--offset_items", + type=click.INT, + help="skip first X items.", + default=0, ) @options.profile_option(required=True) @command_function -def cli(ctx, workflow_identifier, **kwds): +def cli(ctx, workflow_identifier, raw, max_items, offset_items, **kwds): """ Get a list of invocations for a particular workflow ID or alias. """ - info(f"Looking for invocations for workflow {workflow_identifier}...") - runnable = for_runnable_identifier(ctx, workflow_identifier, kwds) + if not raw: + info(f"Looking for invocations for workflow {workflow_identifier}...") profile = profiles.ensure_profile(ctx, kwds.get("profile")) - assert runnable.is_remote_workflow_uri - workflow_id = remote_runnable_to_workflow_id(runnable) - + if workflow_identifier: + runnable = for_runnable_identifier(ctx, workflow_identifier, kwds) + assert runnable.is_remote_workflow_uri + workflow_id = remote_runnable_to_workflow_id(runnable) + else: + workflow_id = "" + gi_client = gi(None, profile["galaxy_url"], profile["galaxy_admin_key"] or profile["galaxy_user_key"]) invocations = get_invocations( - url=profile["galaxy_url"], - key=profile["galaxy_admin_key"] or profile["galaxy_user_key"], + gi=gi_client, workflow_id=workflow_id, + instance=True, + max_items=max_items, + offset_items=offset_items, ) + if raw: + print(json.dumps(invocations, indent=4, sort_keys=True)) + return if tabulate is not None: state_colors = { "ok": "\033[92m", # green @@ -50,34 +82,50 @@ def cli(ctx, workflow_identifier, **kwds): "error": "\033[91m", # red "paused": "\033[96m", # cyan "deleted": "\033[95m", # magenta + "deleting": "\033[95m", # magenta "deleted_new": "\033[95m", # magenta "new": "\033[96m", # cyan "queued": "\033[93m", # yellow + "skipped": "\033[90m", # gray } - print( - tabulate( - { - "Invocation ID": invocations.keys(), - "Jobs status": [ - ", ".join([f"{state_colors[k]}{v} jobs {k}\033[0m" for k, v in inv["states"].items()]) - for inv in invocations.values() - ], - "Invocation report URL": [ - "{}/workflows/invocations/report?id={}".format(profile["galaxy_url"].strip("/"), inv_id) - for inv_id in invocations - ], - "History URL": [ - "{}/histories/view?id={}".format( - profile["galaxy_url"].strip("/"), invocations[inv_id]["history_id"] - ) - for inv_id in invocations - ], - }, - headers="keys", + + grouped_invocations = {} + workflows = {} + for inv_id, inv in invocations.items(): + wf_id = inv["workflow_id"] + if wf_id not in grouped_invocations: + workflow = gi_client.workflows.show_workflow(workflow_id=wf_id, instance=True) + workflows[wf_id] = (workflow["name"], workflow["id"]) + grouped_invocations.setdefault(wf_id, {})[inv_id] = inv + for workflow_id, data in grouped_invocations.items(): + header = f"Workflow: {workflows[workflow_id][0]} : {profile['galaxy_url'].strip('/')}/workflows/run?id={workflows[workflow_id][1]}" + print(f"\n{header}") + print(len(header) * "=") + print( + tabulate( + { + "Invocation ID": data.keys(), + "Invocation report URL": [ + "{}/workflows/invocations/report?id={}".format(profile["galaxy_url"].strip("/"), inv_id) + for inv_id in data + ], + "History URL": [ + "{}/histories/view?id={}".format( + profile["galaxy_url"].strip("/"), invocations[inv_id]["history_id"] + ) + for inv_id in data + ], + "Jobs status": [ + ", ".join([f"{state_colors[k]}{v} jobs {k}\033[0m" for k, v in inv["states"].items()]) + for inv in data.values() + ], + }, + headers="keys", + ) ) - ) else: error("The tabulate package is not installed, invocations could not be listed correctly.") print(json.dumps(invocations, indent=4, sort_keys=True)) - info(f"{len(invocations)} invocations found.") + if not raw: + info(f"{len(invocations)} invocations found.") return diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index cbcea69a8..a8d68af02 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -116,12 +116,29 @@ def summarize_history(ctx, gi, history_id): print("|") -def get_invocations(url, key, workflow_id): - inv_gi = gi(None, url, key) - invocations = inv_gi.workflows.get_invocations(workflow_id) +def get_invocations(gi, workflow_id, instance=False, max_items=100, items_per_request=20, offset_items=0): + invocations = [] + while len(invocations) < max_items: + if workflow_id: + items = gi.invocations.get_invocations( + workflow_id, + limit=min(items_per_request, max_items), + offset=len(invocations) + offset_items, + ) + else: + items = gi.invocations.get_invocations( + instance=instance, + limit=min(items_per_request, max_items), + offset=len(invocations) + offset_items, + ) + if (items is None) or (len(items) == 0): + break + else: + invocations.extend(items) return { invocation["id"]: { - "states": inv_gi.invocations.get_invocation_summary(invocation["id"])["states"], + "states": gi.invocations.get_invocation_summary(invocation["id"])["states"], + "workflow_id": invocation["workflow_id"], "history_id": invocation["history_id"], } for invocation in invocations