diff --git a/proxystore_ex/stream/__init__.py b/proxystore_ex/stream/__init__.py new file mode 100644 index 0000000..9ae09d7 --- /dev/null +++ b/proxystore_ex/stream/__init__.py @@ -0,0 +1 @@ +"""ProxyStream extension modules.""" diff --git a/proxystore_ex/stream/shims/__init__.py b/proxystore_ex/stream/shims/__init__.py new file mode 100644 index 0000000..2ea0cbf --- /dev/null +++ b/proxystore_ex/stream/shims/__init__.py @@ -0,0 +1 @@ +"""Shim extensions for streaming.""" diff --git a/proxystore_ex/stream/shims/mofka.py b/proxystore_ex/stream/shims/mofka.py new file mode 100644 index 0000000..88cc3aa --- /dev/null +++ b/proxystore_ex/stream/shims/mofka.py @@ -0,0 +1,195 @@ +"""MofKa publisher and subscriber shims. + +Shims to the +[`mofka`](https://github.com/mochi-hpc/mofka){target=_blank} +package. +""" + +from __future__ import annotations + +import cloudpickle +import json +import sys + +if sys.version_info >= (3, 11): # pragma: >=3.11 cover + from typing import Self +else: # pragma: <3.11 cover + from typing_extensions import Self + +from proxystore.stream.events import dict_to_event +from proxystore.stream.events import event_to_dict +from proxystore.stream.events import EndOfStreamEvent +from proxystore.stream.events import EventBatch +from proxystore.stream.events import NewObjectEvent +from proxystore.stream.events import NewObjectKeyEvent + +try: + import pymargo.core + from mochi.mofka.client import DataDescriptor + from mochi.mofka.client import MofkaDriver + from mochi.mofka.client import AdaptiveBatchSize + from mochi.mofka.client import Ordering + from pymargo.core import Engine + + mofka_import_error = None + +except ImportError as e: # pragma: no cover + mofka_import_error = e + + +class MofkaPublisher: + """Mofka publisher shim. + + Args: + protocol: Network protocol for communication (e.g., `tcp`, `na+sm`). + group_file: Bedrock generated group file. + """ + + # TODO: strip code of all of these and leave it to users to specify themselves + # and just provide the driver. + def __init__(self, protocol: str, group_file: str) -> None: + if mofka_import_error is not None: # pragma: no cover + raise mofka_import_error + + self._engine = Engine(protocol, pymargo.core.server) + self._driver = MofkaDriver(group_file, self._engine) + + def close(self) -> None: + """Close this publisher.""" + del self.producer + del self._topic + del self._driver + self._engine.finalize() + del self._engine + + def send_events(self, events: EventBatch) -> None: + """Publish a message to the stream. + + Args: + topic: Stream topic to publish message to. + message: Message as bytes to publish to the stream. + """ + + topic = events.topic + batch_size = AdaptiveBatchSize + ordering = Ordering.Strict + + self._topic = self._driver.open_topic(topic) + self.producer = self._topic.producer( + batch_size=batch_size, + ordering=ordering, + ) + + for e in events.events: + if isinstance(e, NewObjectEvent): + self.producer.push( + metadata=json.dumps(e.metadata), + data=cloudpickle.dumps(e.obj), + ) + else: + self.producer.push( + metadata=json.dumps(event_to_dict(e)), + data=cloudpickle.dumps(''), + ) + + # TODO: figure out how to properly batch in mofka + self.producer.flush() + + +class MofkaSubscriber: + """Mofka subscriber shim. + + This shim is an iterable object which will yield [`bytes`][bytes] + messages from the stream, blocking on the next message, until the stream + is closed. + + Args: + protocol: Network protocol for communication (e.g., `tcp`, `na+sm`). + group_file: Bedrock generated group file. + topic_name: Name of the topic to subscribe to. + subscriber_name: Identifier for this current subscriber. + """ + + def __init__( + self, + protocol: str, + group_file: str, + topic_name: str, + subscriber_name: str, + ) -> None: + if mofka_import_error is not None: # pragma: no cover + raise mofka_import_error + + self._engine = Engine(protocol, pymargo.core.server) + self._driver = MofkaDriver(group_file, self._engine) + self._topic = self._driver.open_topic(topic_name) + self.consumer = self._topic.consumer( + name=subscriber_name, + data_selector=self.data_selector, + data_broker=self.data_broker, + ) + + @staticmethod + def data_selector( + metadata: dict[str, int | float | str], + descriptor: DataDescriptor, + ) -> DataDescriptor: + """Mofka data selector implementation. + + This data selector returns all events. + + Args: + metadata: Event metadata. + descriptor: Pointer to the data belonging to the event. + + Returns: + DataDescriptor: Pointer to event data. + """ + return descriptor + + @staticmethod + def data_broker( + metadata: dict[str, int | float | str], + descriptor: DataDescriptor, + ) -> list[bytearray]: + """Mofka data broker implementation. + + Creates a memory buffer for which consumed event will be placed. + + Args: + metadata: Event metadata. + descriptor (DataDescriptor): Pointer to data. + + Returns: + list[bytearray]: Memory buffer of event size. + """ + return [bytearray(descriptor.size)] + + def __iter__(self) -> Self: + return self + + def __next__(self) -> bytes: + return self.next_events() + + def next_events(self) -> EventBatch: + metadata: EndOfStreamEvent | NewObjectKeyEvent | NewObjectEvent + + events = self.consumer.pull().wait() + data = cloudpickle.loads(events.data[0]) + + try: + metadata = dict_to_event(json.loads(events.metadata)) + except Exception: + metadata = NewObjectEvent( + topic=self._topic, metadata=events.metadata, obj=data + ) + + return EventBatch(topic=self._topic, events=[metadata]) + + def close(self) -> None: + """Close this subscriber.""" + del self.consumer + del self._topic + del self._driver + self._engine.finalize() + del self._engine diff --git a/testing/configuration/mofka_config.json b/testing/configuration/mofka_config.json new file mode 100644 index 0000000..02ecb3d --- /dev/null +++ b/testing/configuration/mofka_config.json @@ -0,0 +1,35 @@ +{ + "libraries": [ + "libflock-bedrock-module.so", + "libyokan-bedrock-module.so", + "libwarabi-bedrock-module.so", + "libmofka-bedrock-module.so" + ], + "providers": [ + { + "name": "group_manager", + "type": "flock", + "provider_id": 1, + "config": { + "bootstrap": "self", + "file": "mofka.json", + "group": { + "type": "static" + } + } + }, + { + "name": "master", + "provider_id": 2, + "type": "yokan", + "tags": [ + "mofka:master" + ], + "config": { + "database": { + "type": "map" + } + } + } + ] +} diff --git a/tests/conftest.py b/tests/conftest.py index 746c702..37b2ef4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,14 +27,9 @@ sys.modules['ucp'] = ucp import proxystore +import proxystore.store import pytest -from testing.connectors import connectors -from testing.connectors import daos_connector -from testing.connectors import margo_connector -from testing.connectors import ucx_connector -from testing.connectors import zmq_connector - @pytest.fixture(autouse=True) def _verify_no_registered_stores() -> Generator[None, None, None]: diff --git a/tests/stream/__init__.py b/tests/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/stream/shims/__init__.py b/tests/stream/shims/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/stream/shims/conftest.py b/tests/stream/shims/conftest.py new file mode 100644 index 0000000..6824a0b --- /dev/null +++ b/tests/stream/shims/conftest.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import os +import subprocess + +import pytest + + +@pytest.fixture(scope='module') +def _conf_mofka(): + protocol = 'tcp' + bedrock_conf = 'testing/configuration/mofka_config.json' + groupfile = 'mofka.json' + topic = 'default' + + # Start Bedrock + _ = subprocess.Popen( + ['bedrock', protocol, '-c', bedrock_conf], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + # Create topic + cp = subprocess.run( + ['mofkactl', 'topic', 'create', topic, '--groupfile', groupfile], + check=False, + ) + assert cp.returncode == 0 + + # Add partition + cp = subprocess.run( + [ + 'mofkactl', + 'partition', + 'add', + topic, + '--type', + 'memory', + '--rank', + '0', + '--groupfile', + groupfile, + ], + check=False, + ) + assert cp.returncode == 0 + + yield + + os.remove(groupfile) diff --git a/tests/stream/shims/mofka_test.py b/tests/stream/shims/mofka_test.py new file mode 100644 index 0000000..219bcbf --- /dev/null +++ b/tests/stream/shims/mofka_test.py @@ -0,0 +1,39 @@ +from __future__ import annotations + + +import pytest + +from proxystore.stream.events import EventBatch +from proxystore.stream.events import NewObjectEvent +from proxystore.ex.stream.shims.mofka import MofkaPublisher +from proxystore.ex.stream.shims.mofka import MofkaSubscriber + + +@pytest.mark.usefixtures('_conf_mofka') +def test_basic_publish_subscribe() -> None: + groupfile = 'mofka.json' + protocol = 'tcp' + topic = 'default' + subscriber_name = 'sub1' + + publisher = MofkaPublisher(protocol=protocol, group_file=groupfile) + subscriber = MofkaSubscriber( + protocol=protocol, + group_file=groupfile, + topic_name=topic, + subscriber_name=subscriber_name, + ) + + messages: list[NewObjectEvent] = [ + NewObjectEvent(topic=topic, metadata={'some_data': i}, obj=i) + for i in range(1, 4) + ] + events = EventBatch(topic=topic, events=messages) + publisher.send_events(events) + + publisher.close() + + for expected, received in zip(messages, subscriber): + assert received.events[0].obj == expected.obj + + subscriber.close()