diff --git a/benchmark/bm_write_queue_collector_process.py b/benchmark/bm_write_queue_collector_process.py new file mode 100644 index 0000000..214d0d3 --- /dev/null +++ b/benchmark/bm_write_queue_collector_process.py @@ -0,0 +1,46 @@ +from collections import deque +import multiprocessing as mp + +import numpy as np + +from dcnum import write + + +mp_spawn = mp.get_context('spawn') + + +def setup(): + global event_queue + global writer_dq + global feat_nevents + batch_size = 1000 + num_batches = 3 + num_events = batch_size * num_batches + event_queue = mp.Queue() + writer_dq = deque() + feat_nevents = mp_spawn.Array("i", num_events) + + # Create 1000 events with at most two repetitions in a frame + np.random.seed(42) + rng = np.random.default_rng() + number_order = rng.choice(batch_size, size=batch_size, replace=False) + + # create a sample event + event = { + "temp": np.atleast_1d(rng.normal(23)), + "mask": rng.random((1, 80, 320)) > .5, + } + for ii in range(num_batches): + for idx in number_order: + event_queue.put((ii*batch_size + idx, event)) + + +def main(): + collector_process = write.QueueCollectorProcess( + event_queue=event_queue, + writer_dq=writer_dq, + feat_nevents=feat_nevents, + write_threshold=500, + ) + collector_process.start() + collector_process.join() diff --git a/src/dcnum/logic/ctrl.py b/src/dcnum/logic/ctrl.py index d52d543..417058b 100644 --- a/src/dcnum/logic/ctrl.py +++ b/src/dcnum/logic/ctrl.py @@ -26,11 +26,9 @@ from ..meta import ppid from ..read import HDF5Data, get_mapping_indices from .._version import version, version_tuple -from ..write import ( - DequeWriterThread, HDF5Writer, QueueCollectorThread, copy_features, - copy_metadata, create_with_basins, set_default_filter_kwargs -) - +from dcnum.write import (DequeWriterThread, HDF5Writer, QueueCollectorThread, + QueueCollectorProcess, copy_features, copy_metadata, + create_with_basins, set_default_filter_kwargs) from .job import DCNumPipelineJob from .json_encoder import ExtendedJSONEncoder @@ -207,7 +205,7 @@ def close(self, delete_temporary_files=True): # is `rename`d to `self.jon["path_out"]`. def join(self, delete_temporary_files=True, *args, **kwargs): - super(DCNumJobRunner, self).join(*args, **kwargs) + super(DCNumJobRunner, self)(*args, **kwargs) # Close only after join self.close(delete_temporary_files=delete_temporary_files) @@ -409,7 +407,7 @@ def run_pipeline(self): "hash": self.pphash, }, "python": { - "build": ", ".join(platform.python_build()), + "build": ", "(platform.python_build()), "implementation": platform.python_implementation(), "libraries": get_library_versions_dict([ @@ -754,14 +752,23 @@ def task_segment_extract(self): debug=self.job["debug"]) thr_feat.start() - # Start the data collection thread - thr_coll = QueueCollectorThread( - event_queue=fe_kwargs["event_queue"], - writer_dq=writer_dq, - feat_nevents=fe_kwargs["feat_nevents"], - write_threshold=500, - ) - thr_coll.start() + if self.job["debug"]: + # Start the data collection thread + wor_coll = QueueCollectorThread( + event_queue=fe_kwargs["event_queue"], + writer_dq=writer_dq, + feat_nevents=fe_kwargs["feat_nevents"], + write_threshold=500, + ) + wor_coll.start() + else: + wor_coll = QueueCollectorProcess( + event_queue=fe_kwargs["event_queue"], + writer_dq=writer_dq, + feat_nevents=fe_kwargs["feat_nevents"], + write_threshold=500, + ) + wor_coll.start() data_size = len(self.dtin) t0 = time.monotonic() @@ -769,8 +776,8 @@ def task_segment_extract(self): # So in principle we are done here. We do not have to do anything # besides monitoring the progress. while True: - counted_frames = thr_coll.written_frames - self.event_count = thr_coll.written_events + counted_frames = wor_coll.written_frames + self.event_count = wor_coll.written_events td = time.monotonic() - t0 # set the current status self._progress_ex = counted_frames / data_size @@ -790,7 +797,7 @@ def task_segment_extract(self): # Join the collector thread before the feature extractors. On # compute clusters, we had problems with joining the feature # extractors, maybe because the event_queue was not depleted. - join_thread_helper(thr=thr_coll, + join_thread_helper(thr=wor_coll, timeout=600, retries=10, logger=self.logger, @@ -807,7 +814,7 @@ def task_segment_extract(self): logger=self.logger, name="writer") - self.event_count = thr_coll.written_events + self.event_count = wor_coll.written_events if self.event_count == 0: self.logger.error( f"No events found in {self.draw.path}! Please check the " diff --git a/src/dcnum/logic/job.py b/src/dcnum/logic/job.py index f3f104c..0de591c 100644 --- a/src/dcnum/logic/job.py +++ b/src/dcnum/logic/job.py @@ -14,6 +14,8 @@ from ..read import HDF5Data from ..segm import get_available_segmenters +from dcnum.write import (QueueCollectorThread, QueueCollectorProcess) + class DCNumPipelineJob: def __init__(self, @@ -82,6 +84,27 @@ def __init__(self, Whether to set logging level to "DEBUG" and use threads instead of processes """ + + def run(self): + if self.use_process: + # Prozessbasierte Implementierung + wor_coll = QueueCollectorProcess( + event_queue=self.event_queue, + writer_dq=self.writer_dq, + feat_nevents=self.feat_nevents, + write_threshold=500, + ) + else: + # Threadbasierte Implementierung + wor_coll = QueueCollectorThread( + event_queue=self.event_queue, + writer_dq=self.writer_dq, + feat_nevents=self.feat_nevents, + write_threshold=500, + ) + wor_coll.start() + wor_coll.join() + if no_basins_in_output is not None: warnings.warn("The `no_basins_in_output` keyword argument is " "deprecated. Please use `basin_strategy` instead.") diff --git a/src/dcnum/write/__init__.py b/src/dcnum/write/__init__.py index 6be4830..087d8a6 100644 --- a/src/dcnum/write/__init__.py +++ b/src/dcnum/write/__init__.py @@ -1,6 +1,6 @@ # flake8: noqa: F401 from .deque_writer_thread import DequeWriterThread -from .queue_collector_thread import EventStash, QueueCollectorThread +from .queue_collector_base import EventStash, QueueCollectorThread, QueueCollectorProcess from .writer import ( HDF5Writer, copy_basins, copy_features, copy_metadata, create_with_basins, set_default_filter_kwargs) diff --git a/src/dcnum/write/queue_collector_thread.py b/src/dcnum/write/queue_collector_base.py similarity index 61% rename from src/dcnum/write/queue_collector_thread.py rename to src/dcnum/write/queue_collector_base.py index 1978f3d..7a24ac4 100644 --- a/src/dcnum/write/queue_collector_thread.py +++ b/src/dcnum/write/queue_collector_base.py @@ -1,71 +1,57 @@ -import logging from collections import deque -import queue +from typing import List +import time import multiprocessing as mp +import queue import threading -import time -from typing import List - import numpy as np +import logging + +mp_spawn = mp.get_context("spawn") class EventStash: def __init__(self, index_offset: int, feat_nevents: List[int]): - """Sort events into predefined arrays for bulk access + """Sorts events into predefined arrays for bulk access. Parameters ---------- index_offset: - This is the index offset at which we are working on. - Normally, `feat_nevents` is just a slice of a larger - array and `index_offset` defines at which position - it is taken. + The index offset at which work is being done. + Typically, `feat_nevents` is a slice of a larger + array, and `index_offset` defines the position. feat_nevents: - List that defines how many events there are for each input - frame. If summed up, this defines `self.size`. + List indicating how many events exist per input frame. + The sum of these values defines `self.size`. """ self.events = {} - """Dictionary containing the event arrays""" - self.feat_nevents = feat_nevents - """List containing the number of events per input frame""" - self.nev_idx = np.cumsum(feat_nevents) - """Cumulative sum of `feat_nevents` for determining sorting offsets""" - self.size = int(np.sum(feat_nevents)) - """Number of events in this stash""" - self.num_frames = len(feat_nevents) - """Number of frames in this stash""" - self.index_offset = index_offset - """Global offset compared to the original data instance.""" - self.indices_for_data = np.zeros(self.size, dtype=np.uint32) - """Array containing the indices in the original data instance. - These indices correspond to the events in `events`. - """ - self._tracker = np.zeros(self.num_frames, dtype=bool) - """Private array that tracks the progress.""" def is_complete(self): - """Determine whether the event stash is complete (all events added)""" + """Determines whether the EventStash is complete + (all events have been added)""" + return np.all(self._tracker) def add_events(self, index, events): - """Add events to this stash + """Adds events to this stash Parameters ---------- index: int - Global index (from input dataset) + Global index (from the input dataset) events: dict Event dictionary """ + idx_loc = index - self.index_offset if events: @@ -84,16 +70,18 @@ def add_events(self, index, events): self._tracker[idx_loc] = True def require_feature(self, feat, sample_data): - """Create a new empty feature array in `self.events` and return it + """Creates a new empty feature array + in `self.events` and returns it. Parameters ---------- feat: Feature name sample_data: - Sample data for one event of the feature (used to determine - shape and dtype of the feature array) + Sample data for a feature event (to determine + the shape and data type of the feature array) """ + if feat not in self.events: sample_data = np.array(sample_data) event_shape = sample_data.shape @@ -104,7 +92,7 @@ def require_feature(self, feat, sample_data): return self.events[feat] -class QueueCollectorThread(threading.Thread): +class QueueCollectorBase: def __init__(self, event_queue: mp.Queue, writer_dq: deque, @@ -146,7 +134,7 @@ def __init__(self, output size could be 513 which is computed via `np.sum(feat_nevents[idx:idx+write_threshold])`. """ - super(QueueCollectorThread, self).__init__( + super(QueueCollectorBase, self).__init__( name="QueueCollector", *args, **kwargs) self.logger = logging.getLogger("dcnum.write.QueueCollector") @@ -194,7 +182,118 @@ def run(self): if len(cur_nevents) == 0: self.logger.info( - "Reached dataset end (frame " + # Reached dataset end (frame + # `last_idx` is the size of the dataset in the end, + # because `len(cur_nevents)` is always added to it. + f"{last_idx} of {len(self.feat_nevents)})") + break + + # We have reached the writer threshold. This means the extractor + # has analyzed at least `write_threshold` frames (not events). + self.logger.debug(f"Current frame: {last_idx}") + + # Create an event stash + stash = EventStash( + index_offset=last_idx, + feat_nevents=cur_nevents + ) + + # First check whether there is a matching event from the buffer + # that we possibly populated earlier. + for ii in range(len(self.buffer_dq)): + idx, events = self.buffer_dq.popleft() + if last_idx <= idx < last_idx + self.write_threshold: + stash.add_events(index=idx, events=events) + else: + # Put it back into the buffer (this should not happen + # more than once unless you have many workers adding + # or some of the workers being slower/faster). + self.buffer_dq.append((idx, events)) + + if not stash.is_complete(): + # Now, get the data from the queue until we have everything + # that belongs to our chunk (this might also populate + # buffer_dq). + while True: + try: + idx, events = self.event_queue.get(timeout=.3) + except queue.Empty: + # No time.sleep here, because we are already using + # a timeout in event_queue.get. + continue + if last_idx <= idx < last_idx + self.write_threshold: + stash.add_events(index=idx, events=events) + else: + # Goes onto the buffer stack (might happen if a + # segmentation process was fast and got an event + # from the next slice (context: write_threshold)) + self.buffer_dq.append((idx, events)) + if stash.is_complete(): + break + + # Send the data from the stash to the writer. The stash has + # already put everything into the correct order. + for feat in stash.events: + self.writer_dq.append((feat, stash.events[feat])) + + # Now we also would like to add all the other information + # that were not in the events dictionaries. + + # This array contains indices for `data` corresponding to + # the events that we just saved. + indices = stash.indices_for_data + + # This is the unmapped index from the input HDF5Data instance. + # Unmapped means that this only enumerates HDF5Data, but since + # HDF5Data can be mapped, the index does not necessarily enumerate + # the underlying HDF5 file. Later on, we will have to convert this + # to the correct "basinmap0" feature + # (see `DCNumJobRunner.task_enforce_basin_strategy`) + self.writer_dq.append(("index_unmapped", + np.array(indices, dtype=np.uint32))) + + # Write the number of events. + self.writer_dq.append(("nevents", + # Get nevents for each event from the + # frame-based cur_nevents array. + np.array(stash.feat_nevents)[ + indices - stash.index_offset] + )) + # Update events/frames written (used for monitoring) + self.written_events += stash.size + self.written_frames += stash.num_frames + + # Increment current frame index. + last_idx += len(cur_nevents) + + self.logger.info(f"Counted {self.written_events} events") + self.logger.debug(f"Counted {self.written_frames} frames") + + +class QueueCollectorProcess(QueueCollectorBase, mp_spawn.Process): + def __init__(self, *args, **kwargs): + super(QueueCollectorProcess, self).__init__(*args, **kwargs) + + def run(self): + # We are not writing to `event_queue` so we can safely cancel + # our queue thread if we are told to stop. + # self.event_queue.cancel_join_thread() + # Indexes the current frame in the input HDF5Data instance. + last_idx = 0 + self.logger.debug("Started collector process") + while True: + # Slice of the shared nevents array. If it contains -1 values, + # this means that some of the frames have not yet been processed. + cur_nevents = self.feat_nevents[ + last_idx:last_idx + self.write_threshold] + if np.any(np.array(cur_nevents) < 0): + # We are not yet ready to write any new data to the queue. + time.sleep(.01) + continue + + if len(cur_nevents) == 0: + self.logger.info( + # Reached dataset end (frame # `last_idx` is the size of the dataset in the end, # because `len(cur_nevents)` is always added to it. f"{last_idx} of {len(self.feat_nevents)})") @@ -280,3 +379,8 @@ def run(self): self.logger.info(f"Counted {self.written_events} events") self.logger.debug(f"Counted {self.written_frames} frames") + + +class QueueCollectorThread(QueueCollectorBase, threading.Thread): + def __init__(self, *args, **kwargs): + super(QueueCollectorThread, self).__init__(*args, **kwargs)