Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxystore_ex/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""ProxyStream extension modules."""
1 change: 1 addition & 0 deletions proxystore_ex/stream/shims/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Shim extensions for streaming."""
195 changes: 195 additions & 0 deletions proxystore_ex/stream/shims/mofka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"""MofKa publisher and subscriber shims.

Shims to the
[`mofka`](https://github.com/mochi-hpc/mofka){target=_blank}
package.
"""

from __future__ import annotations

import cloudpickle
import json
import sys

if sys.version_info >= (3, 11): # pragma: >=3.11 cover
from typing import Self
else: # pragma: <3.11 cover
from typing_extensions import Self

from proxystore.stream.events import dict_to_event
from proxystore.stream.events import event_to_dict
from proxystore.stream.events import EndOfStreamEvent
from proxystore.stream.events import EventBatch
from proxystore.stream.events import NewObjectEvent
from proxystore.stream.events import NewObjectKeyEvent

try:
import pymargo.core
from mochi.mofka.client import DataDescriptor
from mochi.mofka.client import MofkaDriver
from mochi.mofka.client import AdaptiveBatchSize
from mochi.mofka.client import Ordering
from pymargo.core import Engine

mofka_import_error = None

except ImportError as e: # pragma: no cover
mofka_import_error = e


class MofkaPublisher:
"""Mofka publisher shim.

Args:
protocol: Network protocol for communication (e.g., `tcp`, `na+sm`).
group_file: Bedrock generated group file.
"""

# TODO: strip code of all of these and leave it to users to specify themselves
# and just provide the driver.
def __init__(self, protocol: str, group_file: str) -> None:
if mofka_import_error is not None: # pragma: no cover
raise mofka_import_error

self._engine = Engine(protocol, pymargo.core.server)
self._driver = MofkaDriver(group_file, self._engine)

def close(self) -> None:
"""Close this publisher."""
del self.producer
del self._topic
del self._driver
self._engine.finalize()
del self._engine

def send_events(self, events: EventBatch) -> None:
"""Publish a message to the stream.

Args:
topic: Stream topic to publish message to.
message: Message as bytes to publish to the stream.
"""

topic = events.topic
batch_size = AdaptiveBatchSize
ordering = Ordering.Strict

self._topic = self._driver.open_topic(topic)
self.producer = self._topic.producer(
batch_size=batch_size,
ordering=ordering,
)

for e in events.events:
if isinstance(e, NewObjectEvent):
self.producer.push(
metadata=json.dumps(e.metadata),
data=cloudpickle.dumps(e.obj),
)
else:
self.producer.push(
metadata=json.dumps(event_to_dict(e)),
data=cloudpickle.dumps(''),
)

# TODO: figure out how to properly batch in mofka
self.producer.flush()


class MofkaSubscriber:
"""Mofka subscriber shim.

This shim is an iterable object which will yield [`bytes`][bytes]
messages from the stream, blocking on the next message, until the stream
is closed.

Args:
protocol: Network protocol for communication (e.g., `tcp`, `na+sm`).
group_file: Bedrock generated group file.
topic_name: Name of the topic to subscribe to.
subscriber_name: Identifier for this current subscriber.
"""

def __init__(
self,
protocol: str,
group_file: str,
topic_name: str,
subscriber_name: str,
) -> None:
if mofka_import_error is not None: # pragma: no cover
raise mofka_import_error

self._engine = Engine(protocol, pymargo.core.server)
self._driver = MofkaDriver(group_file, self._engine)
self._topic = self._driver.open_topic(topic_name)
self.consumer = self._topic.consumer(
name=subscriber_name,
data_selector=self.data_selector,
data_broker=self.data_broker,
)

@staticmethod
def data_selector(
metadata: dict[str, int | float | str],
descriptor: DataDescriptor,
) -> DataDescriptor:
"""Mofka data selector implementation.

This data selector returns all events.

Args:
metadata: Event metadata.
descriptor: Pointer to the data belonging to the event.

Returns:
DataDescriptor: Pointer to event data.
"""
return descriptor

@staticmethod
def data_broker(
metadata: dict[str, int | float | str],
descriptor: DataDescriptor,
) -> list[bytearray]:
"""Mofka data broker implementation.

Creates a memory buffer for which consumed event will be placed.

Args:
metadata: Event metadata.
descriptor (DataDescriptor): Pointer to data.

Returns:
list[bytearray]: Memory buffer of event size.
"""
return [bytearray(descriptor.size)]

def __iter__(self) -> Self:
return self

def __next__(self) -> bytes:
return self.next_events()

def next_events(self) -> EventBatch:
metadata: EndOfStreamEvent | NewObjectKeyEvent | NewObjectEvent

events = self.consumer.pull().wait()
data = cloudpickle.loads(events.data[0])

try:
metadata = dict_to_event(json.loads(events.metadata))
except Exception:
metadata = NewObjectEvent(
topic=self._topic, metadata=events.metadata, obj=data
)

return EventBatch(topic=self._topic, events=[metadata])

def close(self) -> None:
"""Close this subscriber."""
del self.consumer
del self._topic
del self._driver
self._engine.finalize()
del self._engine
35 changes: 35 additions & 0 deletions testing/configuration/mofka_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"libraries": [
"libflock-bedrock-module.so",
"libyokan-bedrock-module.so",
"libwarabi-bedrock-module.so",
"libmofka-bedrock-module.so"
],
"providers": [
{
"name": "group_manager",
"type": "flock",
"provider_id": 1,
"config": {
"bootstrap": "self",
"file": "mofka.json",
"group": {
"type": "static"
}
}
},
{
"name": "master",
"provider_id": 2,
"type": "yokan",
"tags": [
"mofka:master"
],
"config": {
"database": {
"type": "map"
}
}
}
]
}
7 changes: 1 addition & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,9 @@
sys.modules['ucp'] = ucp

import proxystore
import proxystore.store
import pytest

from testing.connectors import connectors
from testing.connectors import daos_connector
from testing.connectors import margo_connector
from testing.connectors import ucx_connector
from testing.connectors import zmq_connector


@pytest.fixture(autouse=True)
def _verify_no_registered_stores() -> Generator[None, None, None]:
Expand Down
Empty file added tests/stream/__init__.py
Empty file.
Empty file added tests/stream/shims/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions tests/stream/shims/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import annotations

import os
import subprocess

import pytest


@pytest.fixture(scope='module')
def _conf_mofka():
protocol = 'tcp'
bedrock_conf = 'testing/configuration/mofka_config.json'
groupfile = 'mofka.json'
topic = 'default'

# Start Bedrock
_ = subprocess.Popen(
['bedrock', protocol, '-c', bedrock_conf],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

# Create topic
cp = subprocess.run(
['mofkactl', 'topic', 'create', topic, '--groupfile', groupfile],
check=False,
)
assert cp.returncode == 0

# Add partition
cp = subprocess.run(
[
'mofkactl',
'partition',
'add',
topic,
'--type',
'memory',
'--rank',
'0',
'--groupfile',
groupfile,
],
check=False,
)
assert cp.returncode == 0

yield

os.remove(groupfile)
39 changes: 39 additions & 0 deletions tests/stream/shims/mofka_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations


import pytest

from proxystore.stream.events import EventBatch
from proxystore.stream.events import NewObjectEvent
from proxystore.ex.stream.shims.mofka import MofkaPublisher
from proxystore.ex.stream.shims.mofka import MofkaSubscriber


@pytest.mark.usefixtures('_conf_mofka')
def test_basic_publish_subscribe() -> None:
groupfile = 'mofka.json'
protocol = 'tcp'
topic = 'default'
subscriber_name = 'sub1'

publisher = MofkaPublisher(protocol=protocol, group_file=groupfile)
subscriber = MofkaSubscriber(
protocol=protocol,
group_file=groupfile,
topic_name=topic,
subscriber_name=subscriber_name,
)

messages: list[NewObjectEvent] = [
NewObjectEvent(topic=topic, metadata={'some_data': i}, obj=i)
for i in range(1, 4)
]
events = EventBatch(topic=topic, events=messages)
publisher.send_events(events)

publisher.close()

for expected, received in zip(messages, subscriber):
assert received.events[0].obj == expected.obj

subscriber.close()
Loading