Skip to content

WIP: Implement TUS upload for invocation imports #20359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/galaxy/managers/model_stores.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import (
Optional,
Union,
Expand Down Expand Up @@ -334,8 +335,13 @@ def create_objects_from_store(
allow_library_creation=for_library,
)
user_context = ModelStoreUserContext(app, galaxy_user) if galaxy_user is not None else None
source = payload.store_content_uri or payload.store_dict
if isinstance(source, str) and source.startswith("tus://"):
session_id = source.split("tus://", 1)[-1]
upload_store = app.config.tus_upload_store or app.config.new_file_path
source = f"file://{os.path.abspath(os.path.join(upload_store, session_id))}"
model_import_store = source_to_import_store(
payload.store_content_uri or payload.store_dict,
source=source,
app=app,
import_options=import_options,
model_store_format=payload.model_store_format,
Expand Down
21 changes: 21 additions & 0 deletions lib/galaxy_test/base/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pytest
import requests
from tusclient import client
from typing_extensions import Protocol

from galaxy.util.properties import get_from_env
Expand Down Expand Up @@ -40,6 +41,8 @@
"result_backend": CELERY_BACKEND,
}

TEST_TUS_CHUNK_SIZE = 1024


@pytest.fixture(scope="session")
def celery_config():
Expand Down Expand Up @@ -247,6 +250,24 @@ def patch(self, *args, **kwds):
def put(self, *args, **kwds):
return self._put(*args, **kwds)

def tus_upload(self, path: str, endpoint: str) -> str:
"""Upload a file to the given endpoint using TUS protocol."""
base_url = self.get_api_url(endpoint)
headers: Dict[str, str] = {}
if self.api_key:
headers["x-api-key"] = self.api_key
my_client = client.TusClient(base_url, headers=headers)
storage = None
metadata: Dict[str, str] = {}

uploader = my_client.uploader(path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1] # type: ignore[unreachable]
return tus_session_id


class AnonymousGalaxyInteractor(ApiTestInteractor):
def __init__(self, test_case):
Expand Down
12 changes: 11 additions & 1 deletion lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,12 @@ def create_invocation_from_store_raw(
store_dict: Optional[Dict[str, Any]] = None,
store_path: Optional[str] = None,
model_store_format: Optional[str] = None,
archive_path: Optional[str] = None,
) -> Response:
if archive_path is not None:

tus_session_id = self.galaxy_interactor.tus_upload(archive_path, "upload/resumable_upload")
store_path = f"tus://{tus_session_id}"
url = "invocations/from_store"
payload = _store_payload(store_dict=store_dict, store_path=store_path, model_store_format=model_store_format)
payload["history_id"] = history_id
Expand All @@ -2126,9 +2131,14 @@ def create_invocation_from_store(
store_dict: Optional[Dict[str, Any]] = None,
store_path: Optional[str] = None,
model_store_format: Optional[str] = None,
archive_path: Optional[str] = None,
) -> Response:
create_response = self.create_invocation_from_store_raw(
history_id, store_dict=store_dict, store_path=store_path, model_store_format=model_store_format
history_id,
store_dict=store_dict,
store_path=store_path,
model_store_format=model_store_format,
archive_path=archive_path,
)
api_asserts.assert_status_code_is_ok(create_response)
return create_response.json()
Expand Down
17 changes: 2 additions & 15 deletions test/integration/test_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
import io
import os
import tempfile
from typing import Dict

import requests
from sqlalchemy import select
from tusclient import client

from galaxy import model
from galaxy.model.base import ensure_object_added_to_session
Expand All @@ -35,7 +33,6 @@

TEST_INPUT_TEXT = "test input content\n"
TEST_FILE_IO = io.StringIO("some initial text data")
TEST_TUS_CHUNK_SIZE = 1024


class TestJobFilesIntegration(integration_util.IntegrationTestCase):
Expand Down Expand Up @@ -135,24 +132,14 @@ def test_write_with_tus(self):
path = self._app.object_store.get_filename(output_hda.dataset)
assert path

upload_url = self._api_url(f"job_files/resumable_upload?job_key={job_key}", use_key=False)
headers: Dict[str, str] = {}
my_client = client.TusClient(upload_url, headers=headers)

storage = None
metadata: Dict[str, str] = {}
t_file = tempfile.NamedTemporaryFile("w")
t_file.write("some initial text data")
t_file.flush()

input_path = t_file.name

uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1] # type: ignore[unreachable]
endpoint_url = f"job_files/resumable_upload?job_key={job_key}"
tus_session_id = self.galaxy_interactor.tus_upload(input_path, endpoint_url)

data = {"path": path, "job_key": job_key, "session_id": tus_session_id}
post_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
Expand Down
20 changes: 14 additions & 6 deletions test/integration/test_workflow_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Any,
cast,
Dict,
Optional,
)

from galaxy.util.compression_utils import CompressedFile
Expand Down Expand Up @@ -52,6 +53,9 @@ def test_export_import_invocation_collection_input_uris_bag_zip(self):
def test_export_import_invocation_collection_input_sts(self):
self._test_export_import_invocation_collection_input(False)

def test_export_import_invocation_with_input_as_output_tus(self):
self._test_export_import_invocation_with_input_as_output(False, use_upload=True)

def test_export_import_invocation_with_input_as_output_uris(self):
self._test_export_import_invocation_with_input_as_output(True)

Expand Down Expand Up @@ -107,10 +111,10 @@ def _test_export_import_invocation_collection_input(self, use_uris, model_store_

self._rerun_imported_workflow(summary, invocation_details)

def _test_export_import_invocation_with_input_as_output(self, use_uris):
def _test_export_import_invocation_with_input_as_output(self, use_uris, use_upload=False):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow_with_inputs_as_outputs(history_id)
invocation_details = self._export_and_import_workflow_invocation(summary, use_uris)
invocation_details = self._export_and_import_workflow_invocation(summary, use_uris, use_upload=use_upload)
output_values = invocation_details["output_values"]
assert len(output_values) == 1
assert "wf_output_param" in output_values
Expand Down Expand Up @@ -171,24 +175,28 @@ def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(s
assert len(contents) == len(original_contents) == 5

def _export_and_import_workflow_invocation(
self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz"
self, summary: RunJobsSummary, use_uris: bool = True, use_upload=False, model_store_format="tgz"
) -> Dict[str, Any]:
invocation_id = summary.invocation_id
extension = model_store_format or "tgz"
if use_uris:
archive_path: Optional[str] = None
if use_uris or use_upload:
uri = f"gxfiles://posix_test/invocation.{extension}"
self.workflow_populator.download_invocation_to_uri(invocation_id, uri, extension=extension)
root = self.root_dir
invocation_path = os.path.join(root, f"invocation.{extension}")
assert os.path.exists(invocation_path)
uri = invocation_path
if use_uris:
uri = invocation_path
else:
archive_path = invocation_path
else:
temp_tar = self.workflow_populator.download_invocation_to_store(invocation_id, extension=extension)
uri = temp_tar

with self.dataset_populator.test_history() as history_id:
response = self.workflow_populator.create_invocation_from_store(
history_id, store_path=uri, model_store_format=model_store_format
history_id, store_path=uri, model_store_format=model_store_format, archive_path=archive_path
)

imported_invocation_details = self._assert_one_invocation_created_and_get_details(response)
Expand Down
Loading