Skip to content

Commit e162382

Browse files
GitHKAndrei Neagu
and
Andrei Neagu
authored
✨ Add exporter code to storage (#7218)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 70409d2 commit e162382

File tree

47 files changed

+1301
-1005
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1301
-1005
lines changed

api/specs/web-server/_long_running_tasks.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from servicelib.long_running_tasks._models import TaskGet, TaskStatus
1414
from simcore_service_webserver._meta import API_VTAG
1515
from simcore_service_webserver.tasks._exception_handlers import (
16-
_TO_HTTP_ERROR_MAP as data_export_http_error_map,
16+
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
1717
)
1818

1919
router = APIRouter(
@@ -23,9 +23,9 @@
2323
],
2424
)
2525

26-
_data_export_responses: dict[int | str, dict[str, Any]] = {
26+
_export_data_responses: dict[int | str, dict[str, Any]] = {
2727
i.status_code: {"model": EnvelopedError}
28-
for i in data_export_http_error_map.values()
28+
for i in export_data_http_error_map.values()
2929
}
3030

3131

@@ -34,7 +34,7 @@
3434
response_model=Envelope[list[TaskGet]],
3535
name="list_tasks",
3636
description="Lists all long running tasks",
37-
responses=_data_export_responses,
37+
responses=_export_data_responses,
3838
)
3939
def get_async_jobs(): ...
4040

@@ -44,7 +44,7 @@ def get_async_jobs(): ...
4444
response_model=Envelope[TaskStatus],
4545
name="get_task_status",
4646
description="Retrieves the status of a task",
47-
responses=_data_export_responses,
47+
responses=_export_data_responses,
4848
)
4949
def get_async_job_status(
5050
_path_params: Annotated[_PathParam, Depends()],
@@ -55,7 +55,7 @@ def get_async_job_status(
5555
"/tasks/{task_id}",
5656
name="cancel_and_delete_task",
5757
description="Cancels and deletes a task",
58-
responses=_data_export_responses,
58+
responses=_export_data_responses,
5959
status_code=status.HTTP_204_NO_CONTENT,
6060
)
6161
def abort_async_job(
@@ -67,7 +67,7 @@ def abort_async_job(
6767
"/tasks/{task_id}/result",
6868
name="get_task_result",
6969
description="Retrieves the result of a task",
70-
responses=_data_export_responses,
70+
responses=_export_data_responses,
7171
)
7272
def get_async_job_result(
7373
_path_params: Annotated[_PathParam, Depends()],

api/specs/web-server/_storage.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from simcore_service_webserver._meta import API_VTAG
3737
from simcore_service_webserver.storage.schemas import DatasetMetaData, FileMetaData
3838
from simcore_service_webserver.tasks._exception_handlers import (
39-
_TO_HTTP_ERROR_MAP as data_export_http_error_map,
39+
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
4040
)
4141

4242
router = APIRouter(
@@ -221,9 +221,9 @@ async def is_completed_upload_file(
221221

222222

223223
# data export
224-
_data_export_responses: dict[int | str, dict[str, Any]] = {
224+
_export_data_responses: dict[int | str, dict[str, Any]] = {
225225
i.status_code: {"model": EnvelopedError}
226-
for i in data_export_http_error_map.values()
226+
for i in export_data_http_error_map.values()
227227
}
228228

229229

@@ -232,7 +232,7 @@ async def is_completed_upload_file(
232232
response_model=Envelope[TaskGet],
233233
name="export_data",
234234
description="Export data",
235-
responses=_data_export_responses,
235+
responses=_export_data_responses,
236236
)
237-
async def export_data(data_export: DataExportPost, location_id: LocationID):
237+
async def export_data(export_data: DataExportPost, location_id: LocationID):
238238
"""Trigger data export. Returns async job id for getting status and results"""

packages/aws-library/src/aws_library/s3/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
MULTIPART_COPY_THRESHOLD: Final[ByteSize] = TypeAdapter(ByteSize).validate_python(
1010
"100MiB"
1111
)
12+
STREAM_READER_CHUNK_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python(
13+
"10MiB"
14+
)
1215

1316
PRESIGNED_LINK_MAX_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5GiB")
1417
S3_MAX_FILE_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("5TiB")

packages/aws-library/tests/test_s3_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
from aiohttp import ClientSession
2828
from aws_library.s3._client import _AWS_MAX_ITEMS_PER_PAGE, S3ObjectKey, SimcoreS3API
2929
from aws_library.s3._constants import (
30-
MULTIPART_COPY_THRESHOLD,
3130
MULTIPART_UPLOADS_MIN_TOTAL_SIZE,
31+
STREAM_READER_CHUNK_SIZE,
3232
)
3333
from aws_library.s3._errors import (
3434
S3BucketInvalidError,
@@ -1902,7 +1902,7 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
19021902
get_zip_bytes_iter(
19031903
archive_entries,
19041904
progress_bar=progress_bar,
1905-
chunk_size=MULTIPART_COPY_THRESHOLD,
1905+
chunk_size=STREAM_READER_CHUNK_SIZE,
19061906
)
19071907
),
19081908
)

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ class JobAbortedError(BaseAsyncjobRpcError):
2727

2828
class JobError(BaseAsyncjobRpcError):
2929
msg_template: str = (
30-
"Job {job_id} failed with exception type {exc_type} and message {exc_msg}"
30+
"Job '{job_id}' failed with exception type '{exc_type}' and message: {exc_msg}"
3131
)

packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py renamed to packages/models-library/src/models_library/api_schemas_storage/export_data_async_jobs.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
# pylint: disable=R6301
22

33
from common_library.errors_classes import OsparcErrorMixin
4-
from models_library.projects_nodes_io import LocationID, StorageFileID
5-
from pydantic import BaseModel, Field
6-
7-
8-
class DataExportTaskStartInput(BaseModel):
9-
location_id: LocationID
10-
file_and_folder_ids: list[StorageFileID] = Field(..., min_length=1)
11-
124

135
### Exceptions
146

packages/models-library/src/models_library/api_schemas_webserver/storage.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33

44
from pydantic import BaseModel, Field
55

6-
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
76
from ..api_schemas_storage.storage_schemas import (
87
DEFAULT_NUMBER_OF_PATHS_PER_PAGE,
98
MAX_NUMBER_OF_PATHS_PER_PAGE,
109
)
11-
from ..projects_nodes_io import LocationID, StorageFileID
12-
from ..rest_pagination import (
13-
CursorQueryParameters,
14-
)
10+
from ..projects_nodes_io import LocationID
11+
from ..rest_pagination import CursorQueryParameters
1512
from ._base import InputSchema
1613

1714

@@ -40,11 +37,8 @@ class BatchDeletePathsBodyParams(InputSchema):
4037
paths: set[Path]
4138

4239

43-
class DataExportPost(InputSchema):
44-
paths: list[StorageFileID]
40+
PathToExport = Path
4541

46-
def to_rpc_schema(self, location_id: LocationID) -> DataExportTaskStartInput:
47-
return DataExportTaskStartInput(
48-
file_and_folder_ids=self.paths,
49-
location_id=location_id,
50-
)
42+
43+
class DataExportPost(InputSchema):
44+
paths: list[PathToExport]

packages/models-library/src/models_library/basic_regex.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
SEE tests_basic_regex.py for examples
66
"""
7+
78
# TODO: for every pattern we should have a formatter function
89
# NOTE: some sites to manualy check ideas
910
# https://regex101.com/
@@ -45,7 +46,9 @@
4546
)
4647

4748
# Storage basic file ID
48-
SIMCORE_S3_FILE_ID_RE = rf"^(api|({UUID_RE_BASE}))\/({UUID_RE_BASE})\/(.+)$"
49+
SIMCORE_S3_FILE_ID_RE = rf"^(exports\/\d+\/{UUID_RE_BASE}\.zip)|((api|({UUID_RE_BASE}))\/({UUID_RE_BASE})\/(.+)$)"
50+
51+
4952
SIMCORE_S3_DIRECTORY_ID_RE = rf"^({UUID_RE_BASE})\/({UUID_RE_BASE})\/(.+)\/$"
5053

5154
# S3 - AWS bucket names [https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html]

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ class ProgressType(StrAutoEnum):
9595

9696

9797
class ProgressMessageMixin(RabbitMessageBase):
98-
channel_name: Literal[
98+
channel_name: Literal["simcore.services.progress.v2"] = (
9999
"simcore.services.progress.v2"
100-
] = "simcore.services.progress.v2"
100+
)
101101
progress_type: ProgressType = (
102102
ProgressType.COMPUTATION_RUNNING
103103
) # NOTE: backwards compatible
@@ -118,9 +118,9 @@ def routing_key(self) -> str | None:
118118

119119

120120
class InstrumentationRabbitMessage(RabbitMessageBase, NodeMessageBase):
121-
channel_name: Literal[
121+
channel_name: Literal["simcore.services.instrumentation"] = (
122122
"simcore.services.instrumentation"
123-
] = "simcore.services.instrumentation"
123+
)
124124
metrics: str
125125
service_uuid: NodeID
126126
service_type: str
@@ -210,9 +210,9 @@ def routing_key(self) -> str | None:
210210

211211

212212
class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
213-
message_type: Literal[
213+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_STARTED] = (
214214
RabbitResourceTrackingMessageType.TRACKING_STARTED
215-
] = RabbitResourceTrackingMessageType.TRACKING_STARTED
215+
)
216216

217217
wallet_id: WalletID | None
218218
wallet_name: str | None
@@ -250,9 +250,9 @@ class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
250250

251251

252252
class RabbitResourceTrackingHeartbeatMessage(RabbitResourceTrackingBaseMessage):
253-
message_type: Literal[
253+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT] = (
254254
RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT
255-
] = RabbitResourceTrackingMessageType.TRACKING_HEARTBEAT
255+
)
256256

257257

258258
class SimcorePlatformStatus(StrAutoEnum):
@@ -261,9 +261,9 @@ class SimcorePlatformStatus(StrAutoEnum):
261261

262262

263263
class RabbitResourceTrackingStoppedMessage(RabbitResourceTrackingBaseMessage):
264-
message_type: Literal[
264+
message_type: Literal[RabbitResourceTrackingMessageType.TRACKING_STOPPED] = (
265265
RabbitResourceTrackingMessageType.TRACKING_STOPPED
266-
] = RabbitResourceTrackingMessageType.TRACKING_STOPPED
266+
)
267267

268268
simcore_platform_status: SimcorePlatformStatus = Field(
269269
...,
@@ -297,9 +297,9 @@ class CreditsLimit(IntEnum):
297297

298298

299299
class WalletCreditsLimitReachedMessage(RabbitMessageBase):
300-
channel_name: Literal[
300+
channel_name: Literal["io.simcore.service.wallets-credit-limit-reached"] = (
301301
"io.simcore.service.wallets-credit-limit-reached"
302-
] = "io.simcore.service.wallets-credit-limit-reached"
302+
)
303303
created_at: datetime.datetime = Field(
304304
default_factory=lambda: arrow.utcnow().datetime,
305305
description="message creation datetime",

packages/models-library/tests/test_project_nodes_io.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
# pylint:disable=unused-argument
33
# pylint:disable=redefined-outer-name
44

5-
from typing import Any
5+
from typing import Any, Final
6+
from uuid import UUID
67

78
import pytest
89
from faker import Faker
@@ -11,9 +12,14 @@
1112
DatCoreFileLink,
1213
SimCoreFileLink,
1314
SimcoreS3DirectoryID,
15+
SimcoreS3FileID,
1416
)
17+
from models_library.users import UserID
1518
from pydantic import TypeAdapter, ValidationError
1619

20+
UUID_0: Final[str] = f"{UUID(int=0)}"
21+
USER_ID_0: Final[UserID] = 0
22+
1723

1824
@pytest.fixture()
1925
def minimal_simcore_file_link(faker: Faker) -> dict[str, Any]:
@@ -115,9 +121,6 @@ def test_store_discriminator():
115121
assert isinstance(rawgraph_node.inputs["input_1"], PortLink)
116122

117123

118-
UUID_0: str = "00000000-0000-0000-0000-000000000000"
119-
120-
121124
def test_simcore_s3_directory_id():
122125
# the only allowed path is the following
123126
result = TypeAdapter(SimcoreS3DirectoryID).validate_python(
@@ -180,3 +183,18 @@ def test_simcore_s3_directory_get_parent():
180183
SimcoreS3DirectoryID._get_parent( # noqa SLF001
181184
"/hello/object/", parent_index=4
182185
)
186+
187+
188+
@pytest.mark.parametrize(
189+
"object_key",
190+
[
191+
f"api/{UUID_0}/some-random-file.png",
192+
f"exports/{USER_ID_0}/{UUID_0}.zip",
193+
f"{UUID_0}/{UUID_0}/some-random-file.png",
194+
f"api/{UUID_0}/some-path/some-random-file.png",
195+
f"{UUID_0}/{UUID_0}/some-path/some-random-file.png",
196+
],
197+
)
198+
def test_simcore_s3_file_id_accepted_patterns(object_key: str):
199+
file_id = TypeAdapter(SimcoreS3FileID).validate_python(object_key)
200+
assert f"{file_id}" == object_key

packages/models-library/tests/test_rabbit_messages.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from typing import Union
2-
31
import pytest
42
from faker import Faker
53
from models_library.progress_bar import ProgressReport
@@ -41,6 +39,6 @@
4139
)
4240
async def test_raw_message_parsing(raw_data: str, class_type: type):
4341
result = TypeAdapter(
44-
Union[ProgressRabbitMessageNode, ProgressRabbitMessageProject]
42+
ProgressRabbitMessageNode | ProgressRabbitMessageProject
4543
).validate_json(raw_data)
46-
assert type(result) == class_type
44+
assert type(result) is class_type

packages/service-library/src/servicelib/archiving_utils/_tdqm_utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from typing import Final
22

3-
from pydantic import ByteSize, NonNegativeFloat, NonNegativeInt
3+
from pydantic import ByteSize, NonNegativeInt
44

5-
_UNIT_MULTIPLIER: Final[NonNegativeFloat] = 1024.0
65
TQDM_FILE_OPTIONS: Final[dict] = {
76
"unit": "byte",
87
"unit_scale": True,

packages/service-library/src/servicelib/bytes_iters/_stream_zip.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
import logging
12
from collections.abc import AsyncIterable
23
from datetime import UTC, datetime
34
from stat import S_IFREG
45
from typing import TypeAlias
56

67
from models_library.bytes_iters import BytesIter, DataSize
7-
from stream_zip import ZIP_32, AsyncMemberFile, async_stream_zip
8+
from stream_zip import ZIP_64, AsyncMemberFile, async_stream_zip
89

910
from ..progress_bar import ProgressBarData
1011
from ._models import BytesStreamer
1112

13+
_logger = logging.getLogger(__name__)
14+
1215
FileNameInArchive: TypeAlias = str
1316
ArchiveFileEntry: TypeAlias = tuple[FileNameInArchive, BytesStreamer]
1417
ArchiveEntries: TypeAlias = list[ArchiveFileEntry]
@@ -22,7 +25,7 @@ async def _member_files_iter(
2225
file_name,
2326
datetime.now(UTC),
2427
S_IFREG | 0o600,
25-
ZIP_32,
28+
ZIP_64,
2629
byte_streamer.with_progress_bytes_iter(progress_bar=progress_bar),
2730
)
2831

0 commit comments

Comments
 (0)