From 9c364f129b5980def6234c527c7c9a28dfaa9dd2 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 23 Oct 2024 15:57:51 +0000 Subject: [PATCH 01/13] Created a new abstract data type DataSource to extend I/O formats Signed-off-by: Sasha Meister --- sdp/data_units/data_entry.py | 31 +++++++++++ sdp/data_units/manifest.py | 90 ++++++++++++++++++++++++++++++++ sdp/data_units/stream.py | 15 ++++++ sdp/processors/base_processor.py | 88 ++++++++++++------------------- sdp/run_processors.py | 38 +++----------- 5 files changed, 175 insertions(+), 87 deletions(-) create mode 100644 sdp/data_units/data_entry.py create mode 100644 sdp/data_units/manifest.py create mode 100644 sdp/data_units/stream.py diff --git a/sdp/data_units/data_entry.py b/sdp/data_units/data_entry.py new file mode 100644 index 00000000..5b55e6f7 --- /dev/null +++ b/sdp/data_units/data_entry.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass, field +from typing import Any, List, Dict, Optional, Iterable +from abc import ABC, abstractmethod + +@dataclass +class DataEntry: + """A wrapper for data entry + any additional metrics.""" + + data: Optional[Dict] # can be None to drop the entry + metrics: Any = None + +class DataSource(ABC): + def __init__(self, source: Any): + self.source = source + self.number_of_entries = 0 + self.total_duration = 0.0 + self.metrics = [] + + @abstractmethod + def read(self, **kwargs) -> List[Dict]: + pass + + @abstractmethod + def write(self, data_entries: Iterable): + for data_entry in data_entries: + self._add_metrics(data_entry) + + def _add_metrics(self, data_entry: DataEntry): + self.metrics.append(data_entry.metrics) + self.number_of_entries += 1 + self.total_duration += data_entry.data.get("duration", 0) diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py new file mode 100644 index 00000000..470ce77e --- /dev/null +++ b/sdp/data_units/manifest.py @@ -0,0 +1,90 @@ +from typing import List, Dict +from uuid import uuid4 +import json +import os +from tqdm import tqdm +from omegaconf import OmegaConf + +from sdp.data_units.data_entry import DataEntry, DataSource + +class Manifest(DataSource): + def __init__(self, filepath: str): + self.write_mode = "w" + super().__init__(filepath) + + def read(self, in_memory_chunksize: int): + manifest_chunk = [] + for idx, data_entry in enumerate(self._read_manifest(), 1): + manifest_chunk.append(data_entry) + if idx % in_memory_chunksize == 0: + yield manifest_chunk + manifest_chunk = [] + if manifest_chunk: + yield manifest_chunk + + def _read_manifest(self): + if self.source is None: + raise NotImplementedError("Override this method if the processor creates initial manifest") + + with open(self.source, 'r', encoding = 'utf8') as input_file: + for line in input_file: + yield json.loads(line) + + def write(self, data: List[DataEntry]): + with open(self.source, self.write_mode, encoding = 'utf8') as fout: + for data_entry in tqdm(data): + self._add_metrics(data_entry) + json.dump(data_entry.data, fout, ensure_ascii=False) + fout.write("\n") + + self.write_mode = 'a' + + +def set_manifests(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): + processors_cfgs_to_init = [] + + cfg = OmegaConf.to_container(cfg) + + # special check for the first processor. + # In case user selected something that does not start from + # manifest creation we will try to infer the input from previous + # output file + if processors_cfgs[0] is not cfg['processors'][0] and "input_manifest_file" not in processors_cfgs[0]: + # locating starting processor + for idx, processor in enumerate(cfg['processors']): + if processor is processors_cfgs[0]: # we don't do a copy, so can just check object ids + if "output_manifest_file" in cfg['processors'][idx - 1]: + processors_cfgs[0]["input_manifest_file"] = Manifest(cfg['processors'][idx - 1]["output_manifest_file"]) + break + + for idx, _processor_cfg in enumerate(processors_cfgs): + processor_cfg = OmegaConf.to_container(_processor_cfg) + # we assume that each processor defines "output_manifest_file" + # and "input_manifest_file" keys, which can be optional. In case they + # are missing, we create tmp files here for them + # (1) first use a temporary file for the "output_manifest_file" if it is unspecified + if "output_manifest_file" in processor_cfg: + processor_cfg["output"] = Manifest(processor_cfg["output_manifest_file"]) + else: + tmp_file_path = os.path.join(tmp_dir, str(uuid4())) + processor_cfg["output"] = Manifest(tmp_file_path) + + processor_cfg.pop("output_manifest_file") + + # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file + # if it hasn't been specified (and if you are not on the last processor) + if "input_manifest_file" in processor_cfg: + print('A' * 100) + processor_cfg["input"] = Manifest(processor_cfg["input_manifest_file"]) + else: + if idx > 0: + processor_cfg["input"] = Manifest(processors_cfgs[idx - 1]["output"].filepath) + + processor_cfg.pop("input_manifest_file") + processors_cfgs_to_init.append(processor_cfg) + + print("=" * 100) + print(processors_cfgs_to_init) + print("=" * 100) + + return processors_cfgs_to_init \ No newline at end of file diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py new file mode 100644 index 00000000..9757da65 --- /dev/null +++ b/sdp/data_units/stream.py @@ -0,0 +1,15 @@ +from io import BytesIO + +class Stream: + def __init__(self): + self.stream = BytesIO() + + def __enter__(self): + return self.stream + + def __exit__(self, exc_type, exc_value, traceback): + self.stream.seek(0) + + +def set_streams(): + pass \ No newline at end of file diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index da550714..daa53c35 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -18,21 +18,17 @@ import os import time from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Union, Dict, List, Optional from tqdm import tqdm from tqdm.contrib.concurrent import process_map +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import Process, Queue from sdp.logging import logger - - -@dataclass -class DataEntry: - """A wrapper for data entry + any additional metrics.""" - - data: Optional[Dict] # can be None to drop the entry - metrics: Any = None +from sdp.data_units.data_entry import DataEntry +from sdp.data_units.manifest import Manifest +from sdp.data_units.stream import Stream class BaseProcessor(ABC): @@ -57,15 +53,15 @@ class BaseProcessor(ABC): as ``input_manifest_file``. """ - def __init__(self, output_manifest_file: str, input_manifest_file: Optional[str] = None): + def __init__(self, output: Union[Manifest | Stream], input: Optional[Union[Manifest | Stream]] = None): - if output_manifest_file and input_manifest_file and (output_manifest_file == input_manifest_file): + if output and input and (output == input): # we cannot have the same input and output manifest file specified because we need to be able to # read from the input_manifest_file and write to the output_manifest_file at the same time - raise ValueError("A processor's specified input_manifest_file and output_manifest_file cannot be the same") + raise ValueError("A processor's specified input and output cannot be the same.") - self.output_manifest_file = output_manifest_file - self.input_manifest_file = input_manifest_file + self.output = output + self.input = input @abstractmethod def process(self): @@ -73,6 +69,7 @@ def process(self): pass def test(self): + assert type(self.input) == type(self.output), f"Input ({type(self.input)}) and output ({type(self.output)}) types do not match." """This method can be used to perform "runtime" tests. This can be any kind of self-consistency tests, but are usually @@ -131,6 +128,10 @@ def __init__( if self.test_cases is None: self.test_cases = [] + print("*" * 100) + print(self.__dict__) + print("*" * 100) + def process(self): """Parallelized implementation of the data processing. @@ -179,13 +180,14 @@ def process(self): """ self.prepare() - os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) - metrics = [] - - with open(self.output_manifest_file, "wt", encoding="utf8") as fout: - for manifest_chunk in self._chunk_manifest(): - # this will unroll all inner lists - data = itertools.chain( + #os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) + + print("-" * 100) + print(self.input) + print("-" * 100) + + for manifest_chunk in self.input.read(self.in_memory_chunksize): + data = itertools.chain( *process_map( self.process_dataset_entry, manifest_chunk, @@ -193,16 +195,12 @@ def process(self): chunksize=self.chunksize, ) ) - for data_entry in tqdm(data): - metrics.append(data_entry.metrics) - if data_entry.data is None: - continue - json.dump(data_entry.data, fout, ensure_ascii=False) - self.number_of_entries += 1 - self.total_duration += data_entry.data.get("duration", 0) - fout.write("\n") - - self.finalize(metrics) + + self.output.write(data) + self.number_of_entries = self.output.number_of_entries + self.total_duration = self.output.total_duration + + self.finalize(self.output.metrics) def prepare(self): """Can be used in derived classes to prepare the processing in any way. @@ -211,30 +209,6 @@ def prepare(self): starting processing the data. """ - def _chunk_manifest(self): - """Splits the manifest into smaller chunks defined by ``in_memory_chunksize``.""" - manifest_chunk = [] - for idx, data_entry in enumerate(self.read_manifest(), 1): - manifest_chunk.append(data_entry) - if idx % self.in_memory_chunksize == 0: - yield manifest_chunk - manifest_chunk = [] - if len(manifest_chunk) > 0: - yield manifest_chunk - - def read_manifest(self): - """Reading the input manifest file. - - .. note:: - This function should be overridden in the "initial" class creating - manifest to read from the original source of data. - """ - if self.input_manifest_file is None: - raise NotImplementedError("Override this method if the processor creates initial manifest") - - with open(self.input_manifest_file, "rt", encoding="utf8") as fin: - for line in fin: - yield json.loads(line) @abstractmethod def process_dataset_entry(self, data_entry) -> List[DataEntry]: @@ -299,6 +273,8 @@ def finalize(self, metrics: List): def test(self): """Applies processing to "test_cases" and raises an error in case of mismatch.""" + super().test() + for test_case in self.test_cases: generated_outputs = self.process_dataset_entry(test_case["input"].copy()) expected_outputs = ( diff --git a/sdp/run_processors.py b/sdp/run_processors.py index b9002de6..da29c4f5 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -15,13 +15,13 @@ import logging import os import tempfile -import uuid from typing import List import hydra from omegaconf import OmegaConf, open_dict from sdp.logging import logger +from sdp.data_units.manifest import set_manifests # registering new resolvers to simplify config files OmegaConf.register_new_resolver("subfield", lambda node, field: node[field]) @@ -108,37 +108,13 @@ def run_processors(cfg): # let's build all processors first to automatically check # for errors in parameters with tempfile.TemporaryDirectory() as tmp_dir: - # special check for the first processor. - # In case user selected something that does not start from - # manifest creation we will try to infer the input from previous - # output file - if processors_cfgs[0] is not cfg.processors[0] and "input_manifest_file" not in processors_cfgs[0]: - # locating starting processor - for idx, processor in enumerate(cfg.processors): - if processor is processors_cfgs[0]: # we don't do a copy, so can just check object ids - if "output_manifest_file" in cfg.processors[idx - 1]: - with open_dict(processors_cfgs[0]): - processors_cfgs[0]["input_manifest_file"] = cfg.processors[idx - 1]["output_manifest_file"] - break - - for idx, processor_cfg in enumerate(processors_cfgs): - logger.info('=> Building processor "%s"', processor_cfg["_target_"]) - - # we assume that each processor defines "output_manifest_file" - # and "input_manifest_file" keys, which can be optional. In case they - # are missing, we create tmp files here for them - # (1) first use a temporary file for the "output_manifest_file" if it is unspecified - if "output_manifest_file" not in processor_cfg: - tmp_file_path = os.path.join(tmp_dir, str(uuid.uuid4())) - with open_dict(processor_cfg): - processor_cfg["output_manifest_file"] = tmp_file_path - - # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file - # if it hasn't been specified (and if you are not on the last processor) - if idx != len(processors_cfgs) - 1 and "input_manifest_file" not in processors_cfgs[idx + 1]: - with open_dict(processors_cfgs[idx + 1]): - processors_cfgs[idx + 1]["input_manifest_file"] = processor_cfg["output_manifest_file"] + #use_streams = cfg.get("use_streams", False) + + processors_cfgs = set_manifests(processors_cfgs = processors_cfgs, + cfg = cfg, + tmp_dir = tmp_dir) + for processor_cfg in processors_cfgs: processor = hydra.utils.instantiate(processor_cfg) # running runtime tests to fail right-away if something is not # matching users expectations From 0d795a557b73f1540f689e27f8c626376c1241e0 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 23 Oct 2024 16:08:09 +0000 Subject: [PATCH 02/13] Removed debugging print() Signed-off-by: Sasha Meister --- sdp/data_units/manifest.py | 8 +++----- sdp/processors/base_processor.py | 9 --------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index 470ce77e..1471dd43 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -31,6 +31,9 @@ def _read_manifest(self): yield json.loads(line) def write(self, data: List[DataEntry]): + if self.write_mode == "w": + os.makedirs(os.path.dirname(self.source), exist_ok=True) + with open(self.source, self.write_mode, encoding = 'utf8') as fout: for data_entry in tqdm(data): self._add_metrics(data_entry) @@ -74,7 +77,6 @@ def set_manifests(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file # if it hasn't been specified (and if you are not on the last processor) if "input_manifest_file" in processor_cfg: - print('A' * 100) processor_cfg["input"] = Manifest(processor_cfg["input_manifest_file"]) else: if idx > 0: @@ -83,8 +85,4 @@ def set_manifests(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): processor_cfg.pop("input_manifest_file") processors_cfgs_to_init.append(processor_cfg) - print("=" * 100) - print(processors_cfgs_to_init) - print("=" * 100) - return processors_cfgs_to_init \ No newline at end of file diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index daa53c35..a27bf94d 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -127,10 +127,6 @@ def __init__( # need to convert to list to avoid errors in iteration over None if self.test_cases is None: self.test_cases = [] - - print("*" * 100) - print(self.__dict__) - print("*" * 100) def process(self): """Parallelized implementation of the data processing. @@ -180,11 +176,6 @@ def process(self): """ self.prepare() - #os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) - - print("-" * 100) - print(self.input) - print("-" * 100) for manifest_chunk in self.input.read(self.in_memory_chunksize): data = itertools.chain( From 629908584340ed7bac2338b1fc258590a7e17e45 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 23 Oct 2024 16:50:43 +0000 Subject: [PATCH 03/13] Sources setting functions refactoring Signed-off-by: Sasha Meister --- sdp/data_units/data_entry.py | 2 +- sdp/data_units/manifest.py | 26 ++++++++++++++++-- sdp/data_units/set_sources.py | 52 +++++++++++++++++++++++++++++++++++ sdp/run_processors.py | 5 ++-- 4 files changed, 79 insertions(+), 6 deletions(-) create mode 100644 sdp/data_units/set_sources.py diff --git a/sdp/data_units/data_entry.py b/sdp/data_units/data_entry.py index 5b55e6f7..fc957c13 100644 --- a/sdp/data_units/data_entry.py +++ b/sdp/data_units/data_entry.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Any, List, Dict, Optional, Iterable from abc import ABC, abstractmethod diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index 1471dd43..ae31b4be 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -43,7 +43,7 @@ def write(self, data: List[DataEntry]): self.write_mode = 'a' -def set_manifests(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): +def set_manifests2(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): processors_cfgs_to_init = [] cfg = OmegaConf.to_container(cfg) @@ -85,4 +85,26 @@ def set_manifests(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): processor_cfg.pop("input_manifest_file") processors_cfgs_to_init.append(processor_cfg) - return processors_cfgs_to_init \ No newline at end of file + return processors_cfgs_to_init + +def set_manifests(processor_cfg, previous_output, tmp_dir): + if "output_manifest_file" in processor_cfg: + processor_cfg["output"] = Manifest(processor_cfg["output_manifest_file"]) + else: + tmp_file_path = os.path.join(tmp_dir, str(uuid4())) + processor_cfg["output"] = Manifest(tmp_file_path) + + processor_cfg.pop("output_manifest_file") + + # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file + # if it hasn't been specified (and if you are not on the last processor) + if "input_manifest_file" in processor_cfg: + processor_cfg["input"] = Manifest(processor_cfg["input_manifest_file"]) + else: + if type(previous_output) is Manifest: + processor_cfg["input"] = previous_output + else: + return ValueError() + + processor_cfg.pop("input_manifest_file") + return processor_cfg \ No newline at end of file diff --git a/sdp/data_units/set_sources.py b/sdp/data_units/set_sources.py new file mode 100644 index 00000000..5c58596b --- /dev/null +++ b/sdp/data_units/set_sources.py @@ -0,0 +1,52 @@ +from omegaconf import OmegaConf +from typing import Any, List, Dict, Optional, Iterable + +from sdp.data_units.manifest import Manifest, set_manifests +from sdp.data_units.stream import Stream, set_streams + + +def set_sources(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str, use_streams: bool = False): + processors_cfgs_to_run = [] + + cfg = OmegaConf.to_container(cfg) + + # special check for the first processor. + # In case user selected something that does not start from + # manifest creation we will try to infer the input from previous + # output file + + if processors_cfgs[0] is not cfg['processors'][0] and "input_manifest_file" not in processors_cfgs[0]: + # locating starting processor + for idx, processor in enumerate(cfg['processors']): + if processor is processors_cfgs[0]: # we don't do a copy, so can just check object ids + if "output_manifest_file" in cfg['processors'][idx - 1]: + processors_cfgs[0]["input_manifest_file"] = Manifest(cfg['processors'][idx - 1]["output_manifest_file"]) + break + + previous_output = None + for idx, _processor_cfg in enumerate(processors_cfgs): + processor_cfg = OmegaConf.to_container(_processor_cfg) + + if _processor_cfg["_target_"] == "": + pass + + elif not use_streams: + processor_cfg = set_manifests(processor_cfg, previous_output, tmp_dir) + + else: + if ("output_manifest_file" in processor_cfg or + "input_manifest_file" in processor_cfg): + + if ("input_stream" in processor_cfg or + "output_stream" in processor_cfg): + + raise ValueError() + + processor_cfg = set_manifests(processor_cfg, previous_output, tmp_dir) + else: + processor_cfg = set_streams(processor_cfg, previous_output) + + processors_cfgs_to_run.append(processor_cfg) + previous_output = processor_cfg['output'] + + return processors_cfgs_to_run \ No newline at end of file diff --git a/sdp/run_processors.py b/sdp/run_processors.py index da29c4f5..cd1fa95a 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -22,6 +22,7 @@ from sdp.logging import logger from sdp.data_units.manifest import set_manifests +from sdp.data_units.set_sources import set_sources # registering new resolvers to simplify config files OmegaConf.register_new_resolver("subfield", lambda node, field: node[field]) @@ -110,9 +111,7 @@ def run_processors(cfg): with tempfile.TemporaryDirectory() as tmp_dir: #use_streams = cfg.get("use_streams", False) - processors_cfgs = set_manifests(processors_cfgs = processors_cfgs, - cfg = cfg, - tmp_dir = tmp_dir) + processors_cfgs = set_sources(processors_cfgs, cfg, tmp_dir, use_streams=False) for processor_cfg in processors_cfgs: processor = hydra.utils.instantiate(processor_cfg) From ff9c1e5938cd4b27095de389a8c38aba25dbb576 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 23 Oct 2024 16:52:13 +0000 Subject: [PATCH 04/13] Removed function duplicates Signed-off-by: Sasha Meister --- sdp/data_units/manifest.py | 44 -------------------------------------- 1 file changed, 44 deletions(-) diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index ae31b4be..1c0853b6 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -43,50 +43,6 @@ def write(self, data: List[DataEntry]): self.write_mode = 'a' -def set_manifests2(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str): - processors_cfgs_to_init = [] - - cfg = OmegaConf.to_container(cfg) - - # special check for the first processor. - # In case user selected something that does not start from - # manifest creation we will try to infer the input from previous - # output file - if processors_cfgs[0] is not cfg['processors'][0] and "input_manifest_file" not in processors_cfgs[0]: - # locating starting processor - for idx, processor in enumerate(cfg['processors']): - if processor is processors_cfgs[0]: # we don't do a copy, so can just check object ids - if "output_manifest_file" in cfg['processors'][idx - 1]: - processors_cfgs[0]["input_manifest_file"] = Manifest(cfg['processors'][idx - 1]["output_manifest_file"]) - break - - for idx, _processor_cfg in enumerate(processors_cfgs): - processor_cfg = OmegaConf.to_container(_processor_cfg) - # we assume that each processor defines "output_manifest_file" - # and "input_manifest_file" keys, which can be optional. In case they - # are missing, we create tmp files here for them - # (1) first use a temporary file for the "output_manifest_file" if it is unspecified - if "output_manifest_file" in processor_cfg: - processor_cfg["output"] = Manifest(processor_cfg["output_manifest_file"]) - else: - tmp_file_path = os.path.join(tmp_dir, str(uuid4())) - processor_cfg["output"] = Manifest(tmp_file_path) - - processor_cfg.pop("output_manifest_file") - - # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file - # if it hasn't been specified (and if you are not on the last processor) - if "input_manifest_file" in processor_cfg: - processor_cfg["input"] = Manifest(processor_cfg["input_manifest_file"]) - else: - if idx > 0: - processor_cfg["input"] = Manifest(processors_cfgs[idx - 1]["output"].filepath) - - processor_cfg.pop("input_manifest_file") - processors_cfgs_to_init.append(processor_cfg) - - return processors_cfgs_to_init - def set_manifests(processor_cfg, previous_output, tmp_dir): if "output_manifest_file" in processor_cfg: processor_cfg["output"] = Manifest(processor_cfg["output_manifest_file"]) From 2b1f01a8232d59aee492e842bf28bdf2e40c27bb Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Thu, 24 Oct 2024 21:23:30 +0000 Subject: [PATCH 05/13] StreamsSetter (draft) Signed-off-by: Sasha Meister --- sdp/data_units/stream.py | 184 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 176 insertions(+), 8 deletions(-) diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py index 9757da65..8efb640e 100644 --- a/sdp/data_units/stream.py +++ b/sdp/data_units/stream.py @@ -1,15 +1,183 @@ from io import BytesIO +from typing import List +import pickle +from tqdm import tqdm -class Stream: +from sdp.data_units.data_entry import DataSource, DataEntry +from sdp.data_units.manifest import Manifest +class Stream(DataSource): def __init__(self): - self.stream = BytesIO() + self.rw_amount = 0 + self.rw_limit = 1 + + super().__init__(BytesIO()) - def __enter__(self): - return self.stream + def rw_control(func): + def wrapper(self, *args, **kwargs): + self.source.seek(0) + func(self, *args, **kwargs) + if self.rw_amount >= self.rw_limit: + self.reset() + return wrapper + + @rw_control + def read(self): + return pickle.load(self.source) + + @rw_control + def write(self, data: List[DataEntry]): + for data_entry in tqdm(data): + self._add_metrics(data_entry) + pickle.dump(data_entry, self.source) + self.source.flush() + + def reset(self): + self.source.truncate(0) + - def __exit__(self, exc_type, exc_value, traceback): - self.stream.seek(0) +class StreamsSetter: + def __init__(self, processors_cfgs): + self.processors_cfgs = processors_cfgs + self.reference_prefix = "stream" + + def resolve_stream(self, reference: str): + reference = reference.replace(self.reference_prefix, "") + if reference == "init": + return Stream() + + current_item = self.processors_cfgs + key_chain = reference.split('.') + for key in key_chain: + try: + key = int(key) + except ValueError: + continue + + current_item = current_item[key] + + if not isinstance(current_item, Stream): + raise ValueError() + + return current_item + + def is_manifest_to_stream(self, processor_idx): + processor_cfg = self.processors_cfgs[processor_idx] + + if processor_cfg['_target_'] == "sdp.processors.ManifestToStream": + if "input_manifest_file" not in processor_cfg: + if not ("output" in self.processors_cfgs[processor_idx - 1] and + isinstance(self.processors_cfgs[processor_idx - 1]["output"], Manifest)): + raise ValueError() + + if "output_stream" in processor_cfg: + if processor_cfg["output_stream"] != f"{self.reference_prefix}:init": + raise ValueError() + + return True + else: + return False + + def as_manifest_to_stream(self, processor_idx): + processor_cfg = self.processors_cfgs[processor_idx] + + if "input_manifest_file" in processor_cfg: + input_manifest = Manifest(processor_cfg.pop("input_manifest_file")) + else: + input_manifest = self.processors_cfgs[processor_idx - 1]['output'] + + processor_cfg["input"] = input_manifest + if "output_stream" in processor_cfg: + output_stream = self.resolve_stream(processor_cfg.pop("output_stream")) + else: + output_stream = Stream() + + processor_cfg["output"] = output_stream + return processor_cfg -def set_streams(): - pass \ No newline at end of file + def is_stream_to_manifest(self, processor_idx): + processor_cfg = self.processors_cfgs[processor_idx] + + if self.processors_cfgs[processor_idx]['_target_'] == "sdp.processors.StreamToManifest": + if "input_stream" in processor_cfg: + if processor_cfg["input_stream"] == f"{self.reference_prefix}:init": + raise ValueError() + else: + if not ("output" in self.processors_cfgs[processor_idx - 1] and + isinstance(self.processors_cfgs[processor_idx - 1]["output"], Stream)): + raise ValueError() + + return True + else: + return False + + def as_stream_to_manifest(self, processor_idx): + processor_cfg = self.processors_cfgs[processor_idx] + if "input_stream" in processor_cfg: + input_stream = self.resolve_stream(processor_cfg.pop("input_stream")) + else: + input_stream = self.processors_cfgs[processor_idx - 1]["output"] + + processor_cfg["input"] = input_stream + + if "output_manifest_file" in processor_cfg: + output_manifest = Manifest(processor_cfg.pop("output_manifest_file")) + else: + output_manifest = Manifest() + + processor_cfg["output"] = output_manifest + return processor_cfg + + def traverse_processor(self, cfg): + if isinstance(cfg, list): + for i, item in enumerate(cfg): + cfg[i] = self.traverse_processor(item) + elif isinstance(cfg, dict): + for key, value in cfg.items(): + cfg[key] = self.traverse_processor(value) + elif isinstance(cfg, str) and cfg.startswith(self.reference_prefix): + cfg = self.resolve_stream(cfg) + + return cfg + + def is_stream_resolvable(self, processor_idx): + processor_cfg = self.processors_cfgs[processor_idx] + + if "input_stream" in processor_cfg: + if not processor_cfg["input_stream"].startswith(self.reference_prefix): + raise ValueError() + + if processor_cfg["input_stream"] == f"{self.reference_prefix}:init": + raise ValueError() + + else: + if not(hasattr(self.processors_cfgs[processor_idx - 1], "output") and + isinstance(self.processors_cfgs[processor_idx - 1]["output"], Stream) + ): + raise ValueError() + + if "output_stream" in processor_cfg: + if processor_cfg["output_stream"] != f"{self.reference_prefix}:init": + raise ValueError() + + return True + + def set_processor_streams(self, processor_idx: int): + if self.is_manifest_to_stream(processor_idx): + processor_cfg = self.as_manifest_to_stream(processor_idx) + + elif self.is_stream_to_manifest(processor_idx): + processor_cfg = self.as_stream_to_manifest(processor_idx) + + elif self.is_stream_resolvable(processor_idx): + processor_cfg = self.processors_cfgs(processor_idx) + processor_cfg = self.traverse_processor(processor_cfg) + + processor_cfg["input"] = processor_cfg.pop("input_stream") + processor_cfg["output"] = processor_cfg.pop("input_stream", Stream()) + + self.processors_cfgs[processor_idx] = processor_cfg + return processor_cfg + + #raise ValueError("Expected a Stream object for 'input'") + #Manifest() without path -> auto tmp \ No newline at end of file From 45e4291a261e926d03e1098d40de00a348a2194a Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 30 Oct 2024 13:08:50 +0000 Subject: [PATCH 06/13] Streams 1st working version Signed-off-by: Sasha Meister --- main.py | 15 ++- sdp/data_units/abc_unit.py | 31 ++++++ sdp/data_units/cache.py | 18 +++ sdp/data_units/data_entry.py | 26 +---- sdp/data_units/manifest.py | 75 ++++++++----- sdp/data_units/set_sources.py | 52 --------- sdp/data_units/stream.py | 99 +++++++++++------ sdp/processors/__init__.py | 2 + sdp/processors/base_processor.py | 2 +- sdp/processors/stream/adapters.py | 38 +++++++ sdp/run_processors.py | 179 +++++++++++++++--------------- 11 files changed, 307 insertions(+), 230 deletions(-) create mode 100644 sdp/data_units/abc_unit.py create mode 100644 sdp/data_units/cache.py delete mode 100644 sdp/data_units/set_sources.py create mode 100644 sdp/processors/stream/adapters.py diff --git a/main.py b/main.py index 7fba0dc0..006d7e90 100644 --- a/main.py +++ b/main.py @@ -15,14 +15,23 @@ import sys import hydra +from omegaconf import OmegaConf -from sdp.run_processors import run_processors +from sdp.run_processors import SDPRunner + +OmegaConf.register_new_resolver("subfield", lambda node, field: node[field]) +OmegaConf.register_new_resolver("not", lambda x: not x) +OmegaConf.register_new_resolver("equal", lambda field, value: field == value) @hydra.main(version_base=None) def main(cfg): - run_processors(cfg) - + processors_to_run = cfg.get("processors_to_run", "all") + sdp = SDPRunner(cfg.processors, processors_to_run) + + use_streams = cfg.get("use_streams", False) + sdp.run(use_streams) + if __name__ == "__main__": # hacking the arguments to always disable hydra's output diff --git a/sdp/data_units/abc_unit.py b/sdp/data_units/abc_unit.py new file mode 100644 index 00000000..62c48bd5 --- /dev/null +++ b/sdp/data_units/abc_unit.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Iterable +from sdp.data_units.data_entry import DataEntry + + +class DataSource(ABC): + def __init__(self, source: Any): + self.source = source + self.number_of_entries = 0 + self.total_duration = 0.0 + self.metrics = [] + + @abstractmethod + def read(self, *args, **kwargs) -> List[Dict]: + pass + + @abstractmethod + def write(self, data_entries: Iterable): + for data_entry in data_entries: + self._add_metrics(data_entry) + + def _add_metrics(self, data_entry: DataEntry): + self.metrics.append(data_entry.metrics) + self.number_of_entries += 1 + self.total_duration += data_entry.data.get("duration", 0) + + +class DataSetter(ABC): + def __init__(self, processors_cfgs: List[Dict]): + self.processors_cfgs = processors_cfgs + diff --git a/sdp/data_units/cache.py b/sdp/data_units/cache.py new file mode 100644 index 00000000..dcd63810 --- /dev/null +++ b/sdp/data_units/cache.py @@ -0,0 +1,18 @@ +import os +from tempfile import TemporaryDirectory +from uuid import uuid4 + +class CacheDir: + def __init__(self, cache_dirpath: str = None, prefix: str = None, suffix: str = None): + if cache_dirpath: + os.makedirs(cache_dirpath, exist_ok=True) + self.cache_dir = TemporaryDirectory(dir = cache_dirpath, prefix = prefix, suffix = suffix) + + def make_tmp_filepath(self): + return os.path.join(self.cache_dir, uuid4()) + + def cleanup(self): + self.cache_dir.cleanup() + + +CACHE_DIR = CacheDir() \ No newline at end of file diff --git a/sdp/data_units/data_entry.py b/sdp/data_units/data_entry.py index fc957c13..61cec04f 100644 --- a/sdp/data_units/data_entry.py +++ b/sdp/data_units/data_entry.py @@ -1,31 +1,9 @@ from dataclasses import dataclass -from typing import Any, List, Dict, Optional, Iterable -from abc import ABC, abstractmethod +from typing import Any, Dict, Optional @dataclass class DataEntry: """A wrapper for data entry + any additional metrics.""" data: Optional[Dict] # can be None to drop the entry - metrics: Any = None - -class DataSource(ABC): - def __init__(self, source: Any): - self.source = source - self.number_of_entries = 0 - self.total_duration = 0.0 - self.metrics = [] - - @abstractmethod - def read(self, **kwargs) -> List[Dict]: - pass - - @abstractmethod - def write(self, data_entries: Iterable): - for data_entry in data_entries: - self._add_metrics(data_entry) - - def _add_metrics(self, data_entry: DataEntry): - self.metrics.append(data_entry.metrics) - self.number_of_entries += 1 - self.total_duration += data_entry.data.get("duration", 0) + metrics: Any = None \ No newline at end of file diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index 1c0853b6..cb0010b4 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -1,25 +1,32 @@ from typing import List, Dict -from uuid import uuid4 import json import os from tqdm import tqdm -from omegaconf import OmegaConf -from sdp.data_units.data_entry import DataEntry, DataSource +from sdp.data_units.data_entry import DataEntry +from sdp.data_units.abc_unit import DataSource, DataSetter +from sdp.data_units.cache import CacheDir, CACHE_DIR class Manifest(DataSource): - def __init__(self, filepath: str): + def __init__(self, filepath: str = None, cache_dir: CacheDir = CACHE_DIR): self.write_mode = "w" + + if not filepath: + filepath = cache_dir.make_tmp_filepath() + super().__init__(filepath) - def read(self, in_memory_chunksize: int): + def read(self, in_memory_chunksize: int = None): manifest_chunk = [] for idx, data_entry in enumerate(self._read_manifest(), 1): + if not in_memory_chunksize: + yield data_entry + manifest_chunk.append(data_entry) - if idx % in_memory_chunksize == 0: + if in_memory_chunksize and idx % in_memory_chunksize == 0: yield manifest_chunk manifest_chunk = [] - if manifest_chunk: + if in_memory_chunksize and manifest_chunk: yield manifest_chunk def _read_manifest(self): @@ -42,25 +49,41 @@ def write(self, data: List[DataEntry]): self.write_mode = 'a' - -def set_manifests(processor_cfg, previous_output, tmp_dir): - if "output_manifest_file" in processor_cfg: - processor_cfg["output"] = Manifest(processor_cfg["output_manifest_file"]) - else: - tmp_file_path = os.path.join(tmp_dir, str(uuid4())) - processor_cfg["output"] = Manifest(tmp_file_path) +class ManifestsSetter(DataSetter): + def __init__(self, processors_cfgs: List[Dict]): + super().__init__(processors_cfgs) - processor_cfg.pop("output_manifest_file") + def is_manifest_resolvable(self, processor_idx: int): + processor_cfg = self.processors_cfgs[processor_idx] + + if "input_manifest_file" not in processor_cfg: + if processor_idx == 0: + pass + + if not("output" in self.processors_cfgs[processor_idx - 1] and + isinstance(self.processors_cfgs[processor_idx - 1]["output"]) is Manifest): + raise ValueError() + + def set_processor_manifests(self, processor_idx: int): + self.is_manifest_resolvable(processor_idx) + + processor_cfg = self.processors_cfgs[processor_idx] - # (2) then link the current processor's output_manifest_file to the next processor's input_manifest_file - # if it hasn't been specified (and if you are not on the last processor) - if "input_manifest_file" in processor_cfg: - processor_cfg["input"] = Manifest(processor_cfg["input_manifest_file"]) - else: - if type(previous_output) is Manifest: - processor_cfg["input"] = previous_output + if "input_manifest_file" in processor_cfg: + input_manifest = Manifest(processor_cfg.pop("input_manifest_file")) else: - return ValueError() - - processor_cfg.pop("input_manifest_file") - return processor_cfg \ No newline at end of file + #1 st processor + input_manifest = self.processors_cfgs[processor_idx - 1]["output"] + + processor_cfg["input"] = input_manifest + + if "output_manifest_file" in processor_cfg: + output_manifest = Manifest(processor_cfg.pop("output_manifest_file")) + else: + output_manifest = Manifest() + + processor_cfg["output"] = output_manifest + + self.processors_cfgs[processor_idx] = processor_cfg + return processor_cfg + diff --git a/sdp/data_units/set_sources.py b/sdp/data_units/set_sources.py deleted file mode 100644 index 5c58596b..00000000 --- a/sdp/data_units/set_sources.py +++ /dev/null @@ -1,52 +0,0 @@ -from omegaconf import OmegaConf -from typing import Any, List, Dict, Optional, Iterable - -from sdp.data_units.manifest import Manifest, set_manifests -from sdp.data_units.stream import Stream, set_streams - - -def set_sources(processors_cfgs: List[Dict], cfg: List[Dict], tmp_dir: str, use_streams: bool = False): - processors_cfgs_to_run = [] - - cfg = OmegaConf.to_container(cfg) - - # special check for the first processor. - # In case user selected something that does not start from - # manifest creation we will try to infer the input from previous - # output file - - if processors_cfgs[0] is not cfg['processors'][0] and "input_manifest_file" not in processors_cfgs[0]: - # locating starting processor - for idx, processor in enumerate(cfg['processors']): - if processor is processors_cfgs[0]: # we don't do a copy, so can just check object ids - if "output_manifest_file" in cfg['processors'][idx - 1]: - processors_cfgs[0]["input_manifest_file"] = Manifest(cfg['processors'][idx - 1]["output_manifest_file"]) - break - - previous_output = None - for idx, _processor_cfg in enumerate(processors_cfgs): - processor_cfg = OmegaConf.to_container(_processor_cfg) - - if _processor_cfg["_target_"] == "": - pass - - elif not use_streams: - processor_cfg = set_manifests(processor_cfg, previous_output, tmp_dir) - - else: - if ("output_manifest_file" in processor_cfg or - "input_manifest_file" in processor_cfg): - - if ("input_stream" in processor_cfg or - "output_stream" in processor_cfg): - - raise ValueError() - - processor_cfg = set_manifests(processor_cfg, previous_output, tmp_dir) - else: - processor_cfg = set_streams(processor_cfg, previous_output) - - processors_cfgs_to_run.append(processor_cfg) - previous_output = processor_cfg['output'] - - return processors_cfgs_to_run \ No newline at end of file diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py index 8efb640e..bcf3b22a 100644 --- a/sdp/data_units/stream.py +++ b/sdp/data_units/stream.py @@ -1,9 +1,9 @@ from io import BytesIO from typing import List import pickle -from tqdm import tqdm -from sdp.data_units.data_entry import DataSource, DataEntry +from sdp.data_units.data_entry import DataEntry +from sdp.data_units.abc_unit import DataSource, DataSetter from sdp.data_units.manifest import Manifest class Stream(DataSource): def __init__(self): @@ -15,64 +15,80 @@ def __init__(self): def rw_control(func): def wrapper(self, *args, **kwargs): self.source.seek(0) - func(self, *args, **kwargs) + result = func(self, *args, **kwargs) + self.rw_amount += 1 if self.rw_amount >= self.rw_limit: self.reset() - return wrapper + return result + return wrapper @rw_control - def read(self): - return pickle.load(self.source) + def read(self, *args, **kwargs): + self.source.seek(0) + data = [pickle.load(self.source)] + return data @rw_control def write(self, data: List[DataEntry]): - for data_entry in tqdm(data): - self._add_metrics(data_entry) - pickle.dump(data_entry, self.source) - self.source.flush() + self.source.seek(0) + data = list(data) + for entry in data: + self._add_metrics(entry) + + pickle.dump([entry.data for entry in data], self.source) def reset(self): self.source.truncate(0) -class StreamsSetter: +class StreamsSetter(DataSetter): def __init__(self, processors_cfgs): - self.processors_cfgs = processors_cfgs - self.reference_prefix = "stream" - + super().__init__(processors_cfgs) + self.reference_stream_prefix = "stream" + def resolve_stream(self, reference: str): - reference = reference.replace(self.reference_prefix, "") + reference = reference.replace(self.reference_stream_prefix + ":", "") if reference == "init": return Stream() current_item = self.processors_cfgs key_chain = reference.split('.') for key in key_chain: - try: + if key.isdigit(): key = int(key) - except ValueError: - continue + + #TODO: replace io_stream fields to "input" / "output" + if isinstance(key, str) and key.endswith("_stream"): + key = key.replace("_stream", "") + current_item = current_item[key] if not isinstance(current_item, Stream): raise ValueError() + current_item.rw_limit += 1 return current_item - def is_manifest_to_stream(self, processor_idx): + def is_manifest_to_stream(self, processor_idx, dry_run: bool = False): processor_cfg = self.processors_cfgs[processor_idx] if processor_cfg['_target_'] == "sdp.processors.ManifestToStream": if "input_manifest_file" not in processor_cfg: if not ("output" in self.processors_cfgs[processor_idx - 1] and - isinstance(self.processors_cfgs[processor_idx - 1]["output"], Manifest)): + isinstance(self.processors_cfgs[processor_idx - 1]["output"], Manifest)): + if dry_run: + return False + raise ValueError() if "output_stream" in processor_cfg: - if processor_cfg["output_stream"] != f"{self.reference_prefix}:init": + if processor_cfg["output_stream"] != f"{self.reference_stream_prefix}:init": + if dry_run: + return False + raise ValueError() - + return True else: return False @@ -95,16 +111,22 @@ def as_manifest_to_stream(self, processor_idx): processor_cfg["output"] = output_stream return processor_cfg - def is_stream_to_manifest(self, processor_idx): + def is_stream_to_manifest(self, processor_idx, dry_run: bool = False): processor_cfg = self.processors_cfgs[processor_idx] if self.processors_cfgs[processor_idx]['_target_'] == "sdp.processors.StreamToManifest": if "input_stream" in processor_cfg: - if processor_cfg["input_stream"] == f"{self.reference_prefix}:init": + if processor_cfg["input_stream"] == f"{self.reference_stream_prefix}:init": + if dry_run: + return False + raise ValueError() else: if not ("output" in self.processors_cfgs[processor_idx - 1] and isinstance(self.processors_cfgs[processor_idx - 1]["output"], Stream)): + if dry_run: + return False + raise ValueError() return True @@ -117,6 +139,7 @@ def as_stream_to_manifest(self, processor_idx): input_stream = self.resolve_stream(processor_cfg.pop("input_stream")) else: input_stream = self.processors_cfgs[processor_idx - 1]["output"] + input_stream.rw_limit += 1 processor_cfg["input"] = input_stream @@ -135,29 +158,41 @@ def traverse_processor(self, cfg): elif isinstance(cfg, dict): for key, value in cfg.items(): cfg[key] = self.traverse_processor(value) - elif isinstance(cfg, str) and cfg.startswith(self.reference_prefix): + elif isinstance(cfg, str) and cfg.startswith(self.reference_stream_prefix): cfg = self.resolve_stream(cfg) return cfg - def is_stream_resolvable(self, processor_idx): + def is_stream_resolvable(self, processor_idx, dry_run: bool = False): processor_cfg = self.processors_cfgs[processor_idx] if "input_stream" in processor_cfg: - if not processor_cfg["input_stream"].startswith(self.reference_prefix): + if not processor_cfg["input_stream"].startswith(self.reference_stream_prefix): + if dry_run: + return False + raise ValueError() - if processor_cfg["input_stream"] == f"{self.reference_prefix}:init": + if processor_cfg["input_stream"] == f"{self.reference_stream_prefix}:init": + if dry_run: + return False + raise ValueError() else: if not(hasattr(self.processors_cfgs[processor_idx - 1], "output") and isinstance(self.processors_cfgs[processor_idx - 1]["output"], Stream) ): + if dry_run: + return False + raise ValueError() if "output_stream" in processor_cfg: - if processor_cfg["output_stream"] != f"{self.reference_prefix}:init": + if processor_cfg["output_stream"] != f"{self.reference_stream_prefix}:init": + if dry_run: + return False + raise ValueError() return True @@ -170,14 +205,14 @@ def set_processor_streams(self, processor_idx: int): processor_cfg = self.as_stream_to_manifest(processor_idx) elif self.is_stream_resolvable(processor_idx): - processor_cfg = self.processors_cfgs(processor_idx) + processor_cfg = self.processors_cfgs[processor_idx] processor_cfg = self.traverse_processor(processor_cfg) processor_cfg["input"] = processor_cfg.pop("input_stream") - processor_cfg["output"] = processor_cfg.pop("input_stream", Stream()) + processor_cfg["output"] = processor_cfg.pop("output_stream", Stream()) self.processors_cfgs[processor_idx] = processor_cfg - return processor_cfg + #return processor_cfg #raise ValueError("Expected a Stream object for 'input'") #Manifest() without path -> auto tmp \ No newline at end of file diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index fdafb521..c8b7505b 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -103,3 +103,5 @@ ) from sdp.processors.nemo.asr_inference import ASRInference from sdp.processors.nemo.pc_inference import PCInference + +from sdp.processors.stream.adapters import StreamToManifest, ManifestToStream \ No newline at end of file diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index a27bf94d..0c4bf7af 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -176,7 +176,7 @@ def process(self): """ self.prepare() - + for manifest_chunk in self.input.read(self.in_memory_chunksize): data = itertools.chain( *process_map( diff --git a/sdp/processors/stream/adapters.py b/sdp/processors/stream/adapters.py new file mode 100644 index 00000000..4a52967a --- /dev/null +++ b/sdp/processors/stream/adapters.py @@ -0,0 +1,38 @@ +from sdp.processors.base_processor import BaseProcessor +from sdp.data_units.data_entry import DataEntry +from sdp.data_units.stream import Stream +from sdp.data_units.manifest import Manifest + + +class ManifestToStream(BaseProcessor): + def __init__(self, + output: Stream, + input: Manifest): + + super().__init__(output = output, + input = input) + + def process(self): + data = [DataEntry(data_entry) for data_entry in self.input.read()] + self.output.write(data) + + def test(self): + assert type(self.input) is Manifest, "" + assert type(self.output) is Stream, "" + + +class StreamToManifest(BaseProcessor): + def __init__(self, + output: Manifest, + input: Stream): + + super().__init__(output = output, + input = input) + + def process(self): + data = [DataEntry(data) for data in self.input.read()[-1]] + self.output.write(data) + + def test(self): + assert type(self.input) is Stream, "" + assert type(self.output) is Manifest, "" \ No newline at end of file diff --git a/sdp/run_processors.py b/sdp/run_processors.py index cd1fa95a..980f510e 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,22 +13,16 @@ # limitations under the License. import logging -import os -import tempfile -from typing import List - +from omegaconf import OmegaConf import hydra -from omegaconf import OmegaConf, open_dict - -from sdp.logging import logger -from sdp.data_units.manifest import set_manifests -from sdp.data_units.set_sources import set_sources +import yaml +import traceback -# registering new resolvers to simplify config files -OmegaConf.register_new_resolver("subfield", lambda node, field: node[field]) -OmegaConf.register_new_resolver("not", lambda x: not x) -OmegaConf.register_new_resolver("equal", lambda field, value: field == value) +from sdp.data_units.cache import CACHE_DIR +from sdp.data_units.manifest import ManifestsSetter +from sdp.data_units.stream import StreamsSetter +from sdp.logging import logger # customizing logger logger.setLevel(logging.INFO) @@ -43,84 +37,85 @@ logger.addHandler(handler) logger.propagate = False - -def select_subset(input_list: List, select_str: str) -> List: - """This function parses a string and selects objects based on that. - - The string is expected to be a valid representation of Python slice. The - only difference with using an actual slice is that we are always returning - a list, never a single element. See examples below for more details. - - Examples:: - - >>> processors_to_run = [1, 2, 3, 4, 5] - >>> select_subset(processors_to_run, "3:") # to exclude first 3 objects - [4, 5] - - >>> select_subset(processors_to_run, ":-1") # to select all but last - [1, 2, 3, 4] - - >>> select_subset(processors_to_run, "2:5") # to select 3rd to 5th - [3, 4, 5] - - >>> # note that unlike normal slice, we still return a list here - >>> select_subset(processors_to_run, "0") # to select only the first - [1] - - >>> select_subset(processors_to_run, "-1") # to select only the last - [5] - - Args: - input_list (list): input list to select objects from. - select_str (str): string representing Python slice. - - Returns: - list: a subset of the input according to the ``select_str`` - - """ - if ":" not in select_str: - selected_objects = [input_list[int(select_str)]] - else: - slice_obj = slice(*map(lambda x: int(x.strip()) if x.strip() else None, select_str.split(":"))) - selected_objects = input_list[slice_obj] - return selected_objects - - -def run_processors(cfg): - logger.info(f"Hydra config: {OmegaConf.to_yaml(cfg)}") - processors_to_run = cfg.get("processors_to_run", "all") - - if processors_to_run == "all": - processors_to_run = ":" - selected_cfgs = select_subset(cfg.processors, processors_to_run) - # filtering out any processors that have should_run=False - processors_cfgs = [] - for processor_cfg in selected_cfgs: - with open_dict(processor_cfg): - should_run = processor_cfg.pop("should_run", True) - if should_run: - processors_cfgs.append(processor_cfg) - - logger.info( - "Specified to run the following processors: %s ", - [cfg["_target_"] for cfg in processors_cfgs], - ) - processors = [] - # let's build all processors first to automatically check - # for errors in parameters - with tempfile.TemporaryDirectory() as tmp_dir: - #use_streams = cfg.get("use_streams", False) +class SDPRunner(ManifestsSetter, StreamsSetter): + def __init__(self, processors_cfgs: OmegaConf, processors_to_select: str): + self.processors_from_cfg = processors_cfgs + self.processors_cfgs = self.select_processors_to_run(processors_to_select) + self.processors = [] - processors_cfgs = set_sources(processors_cfgs, cfg, tmp_dir, use_streams=False) - - for processor_cfg in processors_cfgs: + super().__init__(self.processors_cfgs) + + def select_processors_to_run(self, processors_to_select: str): + selected_cfgs = [] + if processors_to_select == "all": + selected_cfgs = self.processors_from_cfg[:] + elif ":" not in processors_to_select: + selected_cfgs = [self.processors_from_cfg[int(processors_to_select)]] + else: + slice_obj = slice(*map(lambda x: int(x.strip()) if x.strip() else None, processors_to_select.split(":"))) + selected_cfgs = self.processors_from_cfg[slice_obj] + + processors_cfgs = [] + for processor_cfg in selected_cfgs: + processor_cfg = OmegaConf.to_container(processor_cfg) + should_run = processor_cfg.pop("should_run", True) + if should_run: + processors_cfgs.append(processor_cfg) + + return processors_cfgs + + def infer_init_input(self): + if (self.processors_cfgs[0] is not self.processors_from_cfg[0] and + "input_manifest_file" not in self.processors_cfgs[0]): + + for processor_idx, processor_cfg in enumerate(self.processors_from_cfg): + if processor_cfg is self.processors_cfgs[0]: + if "output_manifest_file" in self.processors_from_cfg[processor_idx - 1]: + self.processors_cfgs[0]["input_manifest_file"] = self.processors_from_cfg[processor_idx - 1]["output_manifest_file"] + break + + def set(self, use_streams: bool = False): + self.infer_init_input() + + for processor_idx in range(len(self.processors_cfgs)): + if not use_streams: + self.set_processor_manifests(processor_idx) + + else: + if (self.is_manifest_to_stream(processor_idx, dry_run = True) or + self.is_manifest_to_stream(processor_idx, dry_run = True) or + self.is_stream_resolvable(processor_idx, dry_run = True)): + self.set_processor_streams(processor_idx) + else: + self.set_processor_manifests(processor_idx) + + def build_processors(self): + for processor_cfg in self.processors_cfgs: processor = hydra.utils.instantiate(processor_cfg) - # running runtime tests to fail right-away if something is not - # matching users expectations + self.processors.append(processor) + + + def test_processors(self): + for processor in self.processors: processor.test() - processors.append(processor) - - for processor in processors: - # TODO: add proper str method to all classes for good display - logger.info('=> Running processor "%s"', processor) - processor.process() + + def run(self, use_streams: bool = False): + try: + self.set(use_streams = use_streams) + logger.info( + "Specified to run the following processors:\n %s", + (yaml.dump(self.processors_cfgs, default_flow_style=False)), + ) + + self.build_processors() + self.test_processors() + + for processor in self.processors: + logger.info('=> Running processor "%s"', processor) + processor.process() + + except Exception: + print(f"An error occurred: {traceback.format_exc()}") + + finally: + CACHE_DIR.cleanup() \ No newline at end of file From 24c341c604bccd71d7fb973cbe48f19b71205dc1 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Wed, 30 Oct 2024 13:24:42 +0000 Subject: [PATCH 07/13] Moved whole cfg handling to SDPRunner Signed-off-by: Sasha Meister --- main.py | 7 ++----- sdp/run_processors.py | 16 +++++++++------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/main.py b/main.py index 006d7e90..be6ca091 100644 --- a/main.py +++ b/main.py @@ -26,11 +26,8 @@ @hydra.main(version_base=None) def main(cfg): - processors_to_run = cfg.get("processors_to_run", "all") - sdp = SDPRunner(cfg.processors, processors_to_run) - - use_streams = cfg.get("use_streams", False) - sdp.run(use_streams) + sdp = SDPRunner(cfg) + sdp.run() if __name__ == "__main__": diff --git a/sdp/run_processors.py b/sdp/run_processors.py index 980f510e..d4e7fb32 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -38,10 +38,12 @@ logger.propagate = False class SDPRunner(ManifestsSetter, StreamsSetter): - def __init__(self, processors_cfgs: OmegaConf, processors_to_select: str): - self.processors_from_cfg = processors_cfgs - self.processors_cfgs = self.select_processors_to_run(processors_to_select) + def __init__(self, cfg: OmegaConf): + self.processors_from_cfg = cfg.processors + self.processors_cfgs = self.select_processors_to_run(cfg.get("processors_to_run", "all")) self.processors = [] + + self.use_streams = cfg.get("use_streams", False) super().__init__(self.processors_cfgs) @@ -74,11 +76,11 @@ def infer_init_input(self): self.processors_cfgs[0]["input_manifest_file"] = self.processors_from_cfg[processor_idx - 1]["output_manifest_file"] break - def set(self, use_streams: bool = False): + def set(self): self.infer_init_input() for processor_idx in range(len(self.processors_cfgs)): - if not use_streams: + if not self.use_streams: self.set_processor_manifests(processor_idx) else: @@ -99,9 +101,9 @@ def test_processors(self): for processor in self.processors: processor.test() - def run(self, use_streams: bool = False): + def run(self): try: - self.set(use_streams = use_streams) + self.set() logger.info( "Specified to run the following processors:\n %s", (yaml.dump(self.processors_cfgs, default_flow_style=False)), From 8e087d394a0daf3634dd4d575be0911b49879f2e Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Thu, 31 Oct 2024 16:20:02 +0000 Subject: [PATCH 08/13] Some small fixes Signed-off-by: Sasha Meister --- sdp/data_units/cache.py | 2 +- sdp/data_units/manifest.py | 13 +++++++++---- sdp/run_processors.py | 3 ++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sdp/data_units/cache.py b/sdp/data_units/cache.py index dcd63810..030c60fa 100644 --- a/sdp/data_units/cache.py +++ b/sdp/data_units/cache.py @@ -9,7 +9,7 @@ def __init__(self, cache_dirpath: str = None, prefix: str = None, suffix: str = self.cache_dir = TemporaryDirectory(dir = cache_dirpath, prefix = prefix, suffix = suffix) def make_tmp_filepath(self): - return os.path.join(self.cache_dir, uuid4()) + return os.path.join(self.cache_dir.name, str(uuid4())) def cleanup(self): self.cache_dir.cleanup() diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index cb0010b4..5b8f80b4 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -55,13 +55,15 @@ def __init__(self, processors_cfgs: List[Dict]): def is_manifest_resolvable(self, processor_idx: int): processor_cfg = self.processors_cfgs[processor_idx] + print(processor_idx) + print(processor_cfg) if "input_manifest_file" not in processor_cfg: if processor_idx == 0: - pass + return - if not("output" in self.processors_cfgs[processor_idx - 1] and - isinstance(self.processors_cfgs[processor_idx - 1]["output"]) is Manifest): + if not ("output" in self.processors_cfgs[processor_idx - 1] and + isinstance(self.processors_cfgs[processor_idx - 1]["output"], Manifest)): raise ValueError() def set_processor_manifests(self, processor_idx: int): @@ -73,7 +75,10 @@ def set_processor_manifests(self, processor_idx: int): input_manifest = Manifest(processor_cfg.pop("input_manifest_file")) else: #1 st processor - input_manifest = self.processors_cfgs[processor_idx - 1]["output"] + if processor_idx == 0: + input_manifest = None + else: + input_manifest = self.processors_cfgs[processor_idx - 1]["output"] processor_cfg["input"] = input_manifest diff --git a/sdp/run_processors.py b/sdp/run_processors.py index d4e7fb32..777f80c5 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -38,7 +38,8 @@ logger.propagate = False class SDPRunner(ManifestsSetter, StreamsSetter): - def __init__(self, cfg: OmegaConf): + def __init__(self, cfg: OmegaConf): + OmegaConf.resolve(cfg) self.processors_from_cfg = cfg.processors self.processors_cfgs = self.select_processors_to_run(cfg.get("processors_to_run", "all")) self.processors = [] From a7b0ad2f0832750e5c9458e0d913269144249e93 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Thu, 31 Oct 2024 16:53:40 +0000 Subject: [PATCH 09/13] If data_entry.data is None - don't save Signed-off-by: Sasha Meister --- sdp/data_units/abc_unit.py | 7 ++++--- sdp/data_units/manifest.py | 13 ++++++------- sdp/data_units/stream.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdp/data_units/abc_unit.py b/sdp/data_units/abc_unit.py index 62c48bd5..40c47c85 100644 --- a/sdp/data_units/abc_unit.py +++ b/sdp/data_units/abc_unit.py @@ -20,10 +20,11 @@ def write(self, data_entries: Iterable): self._add_metrics(data_entry) def _add_metrics(self, data_entry: DataEntry): - self.metrics.append(data_entry.metrics) + if data_entry.metrics is not None: + self.metrics.append(data_entry.metrics) + if data_entry.data is not None: + self.total_duration += data_entry.data.get("duration", 0) self.number_of_entries += 1 - self.total_duration += data_entry.data.get("duration", 0) - class DataSetter(ABC): def __init__(self, processors_cfgs: List[Dict]): diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index 5b8f80b4..3c28f3ca 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -44,9 +44,10 @@ def write(self, data: List[DataEntry]): with open(self.source, self.write_mode, encoding = 'utf8') as fout: for data_entry in tqdm(data): self._add_metrics(data_entry) - json.dump(data_entry.data, fout, ensure_ascii=False) - fout.write("\n") - + if data_entry.data: + json.dump(data_entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.write_mode = 'a' class ManifestsSetter(DataSetter): @@ -55,11 +56,9 @@ def __init__(self, processors_cfgs: List[Dict]): def is_manifest_resolvable(self, processor_idx: int): processor_cfg = self.processors_cfgs[processor_idx] - print(processor_idx) - print(processor_cfg) if "input_manifest_file" not in processor_cfg: - if processor_idx == 0: + if processor_idx == 0: ## ToDo return if not ("output" in self.processors_cfgs[processor_idx - 1] and @@ -76,7 +75,7 @@ def set_processor_manifests(self, processor_idx: int): else: #1 st processor if processor_idx == 0: - input_manifest = None + input_manifest = None ##ToDo else: input_manifest = self.processors_cfgs[processor_idx - 1]["output"] diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py index bcf3b22a..50963583 100644 --- a/sdp/data_units/stream.py +++ b/sdp/data_units/stream.py @@ -35,7 +35,7 @@ def write(self, data: List[DataEntry]): for entry in data: self._add_metrics(entry) - pickle.dump([entry.data for entry in data], self.source) + pickle.dump([entry.data for entry in data if entry.data], self.source) def reset(self): self.source.truncate(0) From 65193a16dd296349ceebd955c62cade45e610b4e Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Fri, 1 Nov 2024 08:32:17 +0000 Subject: [PATCH 10/13] removed useless seek(0) in Stream methods Signed-off-by: Sasha Meister --- sdp/data_units/stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py index 50963583..67eb5b80 100644 --- a/sdp/data_units/stream.py +++ b/sdp/data_units/stream.py @@ -24,13 +24,11 @@ def wrapper(self, *args, **kwargs): @rw_control def read(self, *args, **kwargs): - self.source.seek(0) data = [pickle.load(self.source)] return data @rw_control def write(self, data: List[DataEntry]): - self.source.seek(0) data = list(data) for entry in data: self._add_metrics(entry) From b5dbdbd0c0b8984fb3781d0465420a5aea94a709 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Fri, 1 Nov 2024 09:00:34 +0000 Subject: [PATCH 11/13] get_resolvable_link method in DataSetter Signed-off-by: Sasha Meister --- sdp/data_units/abc_unit.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdp/data_units/abc_unit.py b/sdp/data_units/abc_unit.py index 40c47c85..971d6a2c 100644 --- a/sdp/data_units/abc_unit.py +++ b/sdp/data_units/abc_unit.py @@ -29,4 +29,7 @@ def _add_metrics(self, data_entry: DataEntry): class DataSetter(ABC): def __init__(self, processors_cfgs: List[Dict]): self.processors_cfgs = processors_cfgs + + def get_resolvable_link(*args): + return f"${{{'.' + '.'.join(list(map(str, args)))}}}" From 5ffb837ac200cca7077c898e442de32d77eeee26 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Sun, 3 Nov 2024 18:31:05 +0000 Subject: [PATCH 12/13] Multiple bugs fixes, IO types methods modifications Signed-off-by: Sasha Meister --- sdp/data_units/abc_unit.py | 32 +++++++++----- sdp/data_units/manifest.py | 71 ++++++++++++++++++------------- sdp/data_units/stream.py | 49 +++++++++++++-------- sdp/processors/base_processor.py | 6 +-- sdp/processors/stream/adapters.py | 10 ++--- sdp/run_processors.py | 2 +- 6 files changed, 105 insertions(+), 65 deletions(-) diff --git a/sdp/data_units/abc_unit.py b/sdp/data_units/abc_unit.py index 971d6a2c..c18fdc0a 100644 --- a/sdp/data_units/abc_unit.py +++ b/sdp/data_units/abc_unit.py @@ -6,23 +6,36 @@ class DataSource(ABC): def __init__(self, source: Any): self.source = source - self.number_of_entries = 0 + self.number_of_entries = 0 self.total_duration = 0.0 self.metrics = [] + @abstractmethod - def read(self, *args, **kwargs) -> List[Dict]: + def read_entry(self) -> Dict: pass - + @abstractmethod - def write(self, data_entries: Iterable): - for data_entry in data_entries: - self._add_metrics(data_entry) + def read_entries(self, in_memory_chunksize: int = None) -> List[Dict]: + pass - def _add_metrics(self, data_entry: DataEntry): + @abstractmethod + def write_entry(self, data_entry: DataEntry): + pass + + @abstractmethod + def write_entries(self, data_entries: List[DataEntry]): + pass + + #@abstractmethod + #def write(self, data_entries: Iterable): + # for data_entry in data_entries: + # self._add_metrics(data_entry) + + def update_metrics(self, data_entry: DataEntry): if data_entry.metrics is not None: self.metrics.append(data_entry.metrics) - if data_entry.data is not None: + if data_entry.data is dict: self.total_duration += data_entry.data.get("duration", 0) self.number_of_entries += 1 @@ -31,5 +44,4 @@ def __init__(self, processors_cfgs: List[Dict]): self.processors_cfgs = processors_cfgs def get_resolvable_link(*args): - return f"${{{'.' + '.'.join(list(map(str, args)))}}}" - + return f"${{{'.' + '.'.join(list(map(str, args)))}}}" \ No newline at end of file diff --git a/sdp/data_units/manifest.py b/sdp/data_units/manifest.py index 3c28f3ca..6218d478 100644 --- a/sdp/data_units/manifest.py +++ b/sdp/data_units/manifest.py @@ -7,48 +7,60 @@ from sdp.data_units.abc_unit import DataSource, DataSetter from sdp.data_units.cache import CacheDir, CACHE_DIR + class Manifest(DataSource): def __init__(self, filepath: str = None, cache_dir: CacheDir = CACHE_DIR): self.write_mode = "w" + self.encoding = 'utf8' + self.file = None if not filepath: filepath = cache_dir.make_tmp_filepath() super().__init__(filepath) + os.makedirs(os.path.dirname(self.source), exist_ok=True) - def read(self, in_memory_chunksize: int = None): - manifest_chunk = [] - for idx, data_entry in enumerate(self._read_manifest(), 1): - if not in_memory_chunksize: - yield data_entry - - manifest_chunk.append(data_entry) - if in_memory_chunksize and idx % in_memory_chunksize == 0: - yield manifest_chunk - manifest_chunk = [] - if in_memory_chunksize and manifest_chunk: - yield manifest_chunk - - def _read_manifest(self): + def read_entry(self): if self.source is None: raise NotImplementedError("Override this method if the processor creates initial manifest") - with open(self.source, 'r', encoding = 'utf8') as input_file: - for line in input_file: + with open(file=self.source, mode='r', encoding = 'utf8') as file: + for line in file: yield json.loads(line) - def write(self, data: List[DataEntry]): - if self.write_mode == "w": - os.makedirs(os.path.dirname(self.source), exist_ok=True) - - with open(self.source, self.write_mode, encoding = 'utf8') as fout: - for data_entry in tqdm(data): - self._add_metrics(data_entry) - if data_entry.data: - json.dump(data_entry.data, fout, ensure_ascii=False) - fout.write("\n") - - self.write_mode = 'a' + def read_entries(self, in_memory_chunksize = None): + manifest_chunk = [] + for idx, data_entry in enumerate(self.read_entry(), 1): + if not in_memory_chunksize: + yield data_entry + else: + manifest_chunk.append(data_entry) + if idx % in_memory_chunksize == 0: + yield manifest_chunk + manifest_chunk = [] + if manifest_chunk: + yield manifest_chunk + + def write_entry(self, data_entry: DataEntry): + if not self.file: + self.file = open(file=self.source, mode="w", encoding=self.encoding) + self.write_mode = "a" + + self.update_metrics(data_entry) + if data_entry.data: + json.dump(data_entry.data, self.file, ensure_ascii=False) + self.file.write("\n") + + def write_entries(self, data_entries): + for data_entry in tqdm(data_entries): + self.write_entry(data_entry) + + self.close() + + def close(self): + if self.file: + self.file.close() + self.file = None class ManifestsSetter(DataSetter): def __init__(self, processors_cfgs: List[Dict]): @@ -89,5 +101,6 @@ def set_processor_manifests(self, processor_idx: int): processor_cfg["output"] = output_manifest self.processors_cfgs[processor_idx] = processor_cfg - return processor_cfg + print(processor_idx, processor_cfg) + diff --git a/sdp/data_units/stream.py b/sdp/data_units/stream.py index 67eb5b80..55ac7b84 100644 --- a/sdp/data_units/stream.py +++ b/sdp/data_units/stream.py @@ -1,14 +1,19 @@ from io import BytesIO from typing import List import pickle +import json +from tqdm import tqdm from sdp.data_units.data_entry import DataEntry from sdp.data_units.abc_unit import DataSource, DataSetter from sdp.data_units.manifest import Manifest + + class Stream(DataSource): def __init__(self): self.rw_amount = 0 self.rw_limit = 1 + self.encoding = "utf8" super().__init__(BytesIO()) @@ -18,25 +23,30 @@ def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) self.rw_amount += 1 if self.rw_amount >= self.rw_limit: - self.reset() + self.source.truncate(0) return result return wrapper - @rw_control - def read(self, *args, **kwargs): - data = [pickle.load(self.source)] - return data + def read_entry(self): + for line in self.source: + yield json.loads(line.decode(self.encoding)) @rw_control - def write(self, data: List[DataEntry]): - data = list(data) - for entry in data: - self._add_metrics(entry) - - pickle.dump([entry.data for entry in data if entry.data], self.source) + def read_entries(self, in_memory_chunksize = None): + data_entries = [entry for entry in self.read_entry()] + if in_memory_chunksize: + data_entries = [data_entries] + return data_entries - def reset(self): - self.source.truncate(0) + def write_entry(self, data_entry): + self.update_metrics(data_entry) + if data_entry.data: + self.source.write((json.dumps(data_entry.data) + '\n').encode(self.encoding)) + + @rw_control + def write_entries(self, data_entries): + for data_entry in tqdm(data_entries): + self.write_entry(data_entry) class StreamsSetter(DataSetter): @@ -54,8 +64,6 @@ def resolve_stream(self, reference: str): for key in key_chain: if key.isdigit(): key = int(key) - - #TODO: replace io_stream fields to "input" / "output" if isinstance(key, str) and key.endswith("_stream"): key = key.replace("_stream", "") @@ -178,7 +186,7 @@ def is_stream_resolvable(self, processor_idx, dry_run: bool = False): raise ValueError() else: - if not(hasattr(self.processors_cfgs[processor_idx - 1], "output") and + if not("output" in self.processors_cfgs[processor_idx - 1] and isinstance(self.processors_cfgs[processor_idx - 1]["output"], Stream) ): if dry_run: @@ -206,10 +214,17 @@ def set_processor_streams(self, processor_idx: int): processor_cfg = self.processors_cfgs[processor_idx] processor_cfg = self.traverse_processor(processor_cfg) - processor_cfg["input"] = processor_cfg.pop("input_stream") + if "input_stream" in processor_cfg: + input_stream = processor_cfg.pop("input_stream") + else: + input_stream = self.processors_cfgs[processor_idx - 1]["output"] + self.processors_cfgs[processor_idx - 1]["output"].rw_limit += 1 + + processor_cfg["input"] = input_stream processor_cfg["output"] = processor_cfg.pop("output_stream", Stream()) self.processors_cfgs[processor_idx] = processor_cfg + print(processor_idx, processor_cfg) #return processor_cfg #raise ValueError("Expected a Stream object for 'input'") diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index 0c4bf7af..989334d5 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -69,7 +69,7 @@ def process(self): pass def test(self): - assert type(self.input) == type(self.output), f"Input ({type(self.input)}) and output ({type(self.output)}) types do not match." + #assert type(self.input) == type(self.output), f"Input ({type(self.input)}) and output ({type(self.output)}) types do not match." """This method can be used to perform "runtime" tests. This can be any kind of self-consistency tests, but are usually @@ -177,7 +177,7 @@ def process(self): """ self.prepare() - for manifest_chunk in self.input.read(self.in_memory_chunksize): + for manifest_chunk in self.input.read_entries(self.in_memory_chunksize): data = itertools.chain( *process_map( self.process_dataset_entry, @@ -187,7 +187,7 @@ def process(self): ) ) - self.output.write(data) + self.output.write_entries(data) self.number_of_entries = self.output.number_of_entries self.total_duration = self.output.total_duration diff --git a/sdp/processors/stream/adapters.py b/sdp/processors/stream/adapters.py index 4a52967a..cb18ef94 100644 --- a/sdp/processors/stream/adapters.py +++ b/sdp/processors/stream/adapters.py @@ -13,8 +13,8 @@ def __init__(self, input = input) def process(self): - data = [DataEntry(data_entry) for data_entry in self.input.read()] - self.output.write(data) + data = [DataEntry(data_entry) for data_entry in self.input.read_entry()] + self.output.write_entries(data) def test(self): assert type(self.input) is Manifest, "" @@ -30,9 +30,9 @@ def __init__(self, input = input) def process(self): - data = [DataEntry(data) for data in self.input.read()[-1]] - self.output.write(data) + data = [DataEntry(data) for data in self.input.read_entry()] + self.output.write_entries(data) def test(self): - assert type(self.input) is Stream, "" + assert type(self.input) is Stream, f"" assert type(self.output) is Manifest, "" \ No newline at end of file diff --git a/sdp/run_processors.py b/sdp/run_processors.py index 777f80c5..b68155c6 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -86,7 +86,7 @@ def set(self): else: if (self.is_manifest_to_stream(processor_idx, dry_run = True) or - self.is_manifest_to_stream(processor_idx, dry_run = True) or + self.is_stream_to_manifest(processor_idx, dry_run = True) or self.is_stream_resolvable(processor_idx, dry_run = True)): self.set_processor_streams(processor_idx) else: From fd9db30457ddfdda49c56952f6ef0a34c49003fd Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Sun, 3 Nov 2024 19:00:48 +0000 Subject: [PATCH 13/13] Added methods for CacheDir, removed comments from abc_unit. Signed-off-by: Sasha Meister --- sdp/data_units/abc_unit.py | 5 ----- sdp/data_units/cache.py | 4 ++++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdp/data_units/abc_unit.py b/sdp/data_units/abc_unit.py index c18fdc0a..a4cec293 100644 --- a/sdp/data_units/abc_unit.py +++ b/sdp/data_units/abc_unit.py @@ -26,11 +26,6 @@ def write_entry(self, data_entry: DataEntry): @abstractmethod def write_entries(self, data_entries: List[DataEntry]): pass - - #@abstractmethod - #def write(self, data_entries: Iterable): - # for data_entry in data_entries: - # self._add_metrics(data_entry) def update_metrics(self, data_entry: DataEntry): if data_entry.metrics is not None: diff --git a/sdp/data_units/cache.py b/sdp/data_units/cache.py index 030c60fa..138d605d 100644 --- a/sdp/data_units/cache.py +++ b/sdp/data_units/cache.py @@ -10,6 +10,10 @@ def __init__(self, cache_dirpath: str = None, prefix: str = None, suffix: str = def make_tmp_filepath(self): return os.path.join(self.cache_dir.name, str(uuid4())) + + def makedir(self, **kwargs): + tmp_dir = CacheDir(cache_dirpath = self.cache_dir.name, **kwargs) + return tmp_dir def cleanup(self): self.cache_dir.cleanup()