Skip to content

Issue#32 #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
46 changes: 46 additions & 0 deletions benchmark/bm_write_queue_collector_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from collections import deque
import multiprocessing as mp

import numpy as np

from dcnum import write


mp_spawn = mp.get_context('spawn')


def setup():
global event_queue
global writer_dq
global feat_nevents
batch_size = 1000
num_batches = 3
num_events = batch_size * num_batches
event_queue = mp.Queue()
writer_dq = deque()
feat_nevents = mp_spawn.Array("i", num_events)

# Create 1000 events with at most two repetitions in a frame
np.random.seed(42)
rng = np.random.default_rng()
number_order = rng.choice(batch_size, size=batch_size, replace=False)

# create a sample event
event = {
"temp": np.atleast_1d(rng.normal(23)),
"mask": rng.random((1, 80, 320)) > .5,
}
for ii in range(num_batches):
for idx in number_order:
event_queue.put((ii*batch_size + idx, event))


def main():
collector_process = write.QueueCollectorProcess(
event_queue=event_queue,
writer_dq=writer_dq,
feat_nevents=feat_nevents,
write_threshold=500,
)
collector_process.start()
collector_process.join()
45 changes: 26 additions & 19 deletions src/dcnum/logic/ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from ..meta import ppid
from ..read import HDF5Data, get_mapping_indices
from .._version import version, version_tuple
from ..write import (
DequeWriterThread, HDF5Writer, QueueCollectorThread, copy_features,
copy_metadata, create_with_basins, set_default_filter_kwargs
)

from dcnum.write import (DequeWriterThread, HDF5Writer, QueueCollectorThread,
QueueCollectorProcess, copy_features, copy_metadata,
create_with_basins, set_default_filter_kwargs)
from .job import DCNumPipelineJob
from .json_encoder import ExtendedJSONEncoder

Expand Down Expand Up @@ -207,7 +205,7 @@ def close(self, delete_temporary_files=True):
# is `rename`d to `self.jon["path_out"]`.

def join(self, delete_temporary_files=True, *args, **kwargs):
super(DCNumJobRunner, self).join(*args, **kwargs)
super(DCNumJobRunner, self)(*args, **kwargs)
# Close only after join
self.close(delete_temporary_files=delete_temporary_files)

Expand Down Expand Up @@ -409,7 +407,7 @@ def run_pipeline(self):
"hash": self.pphash,
},
"python": {
"build": ", ".join(platform.python_build()),
"build": ", "(platform.python_build()),
"implementation":
platform.python_implementation(),
"libraries": get_library_versions_dict([
Expand Down Expand Up @@ -754,23 +752,32 @@ def task_segment_extract(self):
debug=self.job["debug"])
thr_feat.start()

# Start the data collection thread
thr_coll = QueueCollectorThread(
event_queue=fe_kwargs["event_queue"],
writer_dq=writer_dq,
feat_nevents=fe_kwargs["feat_nevents"],
write_threshold=500,
)
thr_coll.start()
if self.job["debug"]:
# Start the data collection thread
wor_coll = QueueCollectorThread(
event_queue=fe_kwargs["event_queue"],
writer_dq=writer_dq,
feat_nevents=fe_kwargs["feat_nevents"],
write_threshold=500,
)
wor_coll.start()
else:
wor_coll = QueueCollectorProcess(
event_queue=fe_kwargs["event_queue"],
writer_dq=writer_dq,
feat_nevents=fe_kwargs["feat_nevents"],
write_threshold=500,
)
wor_coll.start()

data_size = len(self.dtin)
t0 = time.monotonic()

# So in principle we are done here. We do not have to do anything
# besides monitoring the progress.
while True:
counted_frames = thr_coll.written_frames
self.event_count = thr_coll.written_events
counted_frames = wor_coll.written_frames
self.event_count = wor_coll.written_events
td = time.monotonic() - t0
# set the current status
self._progress_ex = counted_frames / data_size
Expand All @@ -790,7 +797,7 @@ def task_segment_extract(self):
# Join the collector thread before the feature extractors. On
# compute clusters, we had problems with joining the feature
# extractors, maybe because the event_queue was not depleted.
join_thread_helper(thr=thr_coll,
join_thread_helper(thr=wor_coll,
timeout=600,
retries=10,
logger=self.logger,
Expand All @@ -807,7 +814,7 @@ def task_segment_extract(self):
logger=self.logger,
name="writer")

self.event_count = thr_coll.written_events
self.event_count = wor_coll.written_events
if self.event_count == 0:
self.logger.error(
f"No events found in {self.draw.path}! Please check the "
Expand Down
23 changes: 23 additions & 0 deletions src/dcnum/logic/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from ..read import HDF5Data
from ..segm import get_available_segmenters

from dcnum.write import (QueueCollectorThread, QueueCollectorProcess)


class DCNumPipelineJob:
def __init__(self,
Expand Down Expand Up @@ -82,6 +84,27 @@ def __init__(self,
Whether to set logging level to "DEBUG" and
use threads instead of processes
"""

def run(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing here? @Orange99

if self.use_process:
# Prozessbasierte Implementierung
wor_coll = QueueCollectorProcess(
event_queue=self.event_queue,
writer_dq=self.writer_dq,
feat_nevents=self.feat_nevents,
write_threshold=500,
)
else:
# Threadbasierte Implementierung
wor_coll = QueueCollectorThread(
event_queue=self.event_queue,
writer_dq=self.writer_dq,
feat_nevents=self.feat_nevents,
write_threshold=500,
)
wor_coll.start()
wor_coll.join()

if no_basins_in_output is not None:
warnings.warn("The `no_basins_in_output` keyword argument is "
"deprecated. Please use `basin_strategy` instead.")
Expand Down
2 changes: 1 addition & 1 deletion src/dcnum/write/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flake8: noqa: F401
from .deque_writer_thread import DequeWriterThread
from .queue_collector_thread import EventStash, QueueCollectorThread
from .queue_collector_base import EventStash, QueueCollectorThread, QueueCollectorProcess
from .writer import (
HDF5Writer, copy_basins, copy_features, copy_metadata, create_with_basins,
set_default_filter_kwargs)
Loading
Loading