Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
import sys

import hydra
from omegaconf import OmegaConf

from sdp.run_processors import run_processors
from sdp.run_processors import SDPRunner

OmegaConf.register_new_resolver("subfield", lambda node, field: node[field])
OmegaConf.register_new_resolver("not", lambda x: not x)
OmegaConf.register_new_resolver("equal", lambda field, value: field == value)


@hydra.main(version_base=None)
def main(cfg):
run_processors(cfg)

sdp = SDPRunner(cfg)
sdp.run()


if __name__ == "__main__":
# hacking the arguments to always disable hydra's output
Expand Down
42 changes: 42 additions & 0 deletions sdp/data_units/abc_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Iterable
from sdp.data_units.data_entry import DataEntry


class DataSource(ABC):
def __init__(self, source: Any):
self.source = source
self.number_of_entries = 0
self.total_duration = 0.0
self.metrics = []


@abstractmethod
def read_entry(self) -> Dict:
pass

@abstractmethod
def read_entries(self, in_memory_chunksize: int = None) -> List[Dict]:
pass

@abstractmethod
def write_entry(self, data_entry: DataEntry):
pass

@abstractmethod
def write_entries(self, data_entries: List[DataEntry]):
pass

def update_metrics(self, data_entry: DataEntry):
if data_entry.metrics is not None:
self.metrics.append(data_entry.metrics)
if data_entry.data is dict:
self.total_duration += data_entry.data.get("duration", 0)
self.number_of_entries += 1

class DataSetter(ABC):
def __init__(self, processors_cfgs: List[Dict]):
self.processors_cfgs = processors_cfgs

def get_resolvable_link(*args):
return f"${{{'.' + '.'.join(list(map(str, args)))}}}"
22 changes: 22 additions & 0 deletions sdp/data_units/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
from tempfile import TemporaryDirectory
from uuid import uuid4

class CacheDir:
def __init__(self, cache_dirpath: str = None, prefix: str = None, suffix: str = None):
if cache_dirpath:
os.makedirs(cache_dirpath, exist_ok=True)
self.cache_dir = TemporaryDirectory(dir = cache_dirpath, prefix = prefix, suffix = suffix)

def make_tmp_filepath(self):
return os.path.join(self.cache_dir.name, str(uuid4()))

def makedir(self, **kwargs):
tmp_dir = CacheDir(cache_dirpath = self.cache_dir.name, **kwargs)
return tmp_dir

def cleanup(self):
self.cache_dir.cleanup()


CACHE_DIR = CacheDir()
9 changes: 9 additions & 0 deletions sdp/data_units/data_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

@dataclass
class DataEntry:
"""A wrapper for data entry + any additional metrics."""

data: Optional[Dict] # can be None to drop the entry
metrics: Any = None
106 changes: 106 additions & 0 deletions sdp/data_units/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import List, Dict
import json
import os
from tqdm import tqdm

from sdp.data_units.data_entry import DataEntry
from sdp.data_units.abc_unit import DataSource, DataSetter
from sdp.data_units.cache import CacheDir, CACHE_DIR


class Manifest(DataSource):
def __init__(self, filepath: str = None, cache_dir: CacheDir = CACHE_DIR):
self.write_mode = "w"
self.encoding = 'utf8'
self.file = None

if not filepath:
filepath = cache_dir.make_tmp_filepath()

super().__init__(filepath)
os.makedirs(os.path.dirname(self.source), exist_ok=True)

def read_entry(self):
if self.source is None:
raise NotImplementedError("Override this method if the processor creates initial manifest")

with open(file=self.source, mode='r', encoding = 'utf8') as file:
for line in file:
yield json.loads(line)

def read_entries(self, in_memory_chunksize = None):
manifest_chunk = []
for idx, data_entry in enumerate(self.read_entry(), 1):
if not in_memory_chunksize:
yield data_entry
else:
manifest_chunk.append(data_entry)
if idx % in_memory_chunksize == 0:
yield manifest_chunk
manifest_chunk = []
if manifest_chunk:
yield manifest_chunk

def write_entry(self, data_entry: DataEntry):
if not self.file:
self.file = open(file=self.source, mode="w", encoding=self.encoding)
self.write_mode = "a"

self.update_metrics(data_entry)
if data_entry.data:
json.dump(data_entry.data, self.file, ensure_ascii=False)
self.file.write("\n")

def write_entries(self, data_entries):
for data_entry in tqdm(data_entries):
self.write_entry(data_entry)

self.close()

def close(self):
if self.file:
self.file.close()
self.file = None

class ManifestsSetter(DataSetter):
def __init__(self, processors_cfgs: List[Dict]):
super().__init__(processors_cfgs)

def is_manifest_resolvable(self, processor_idx: int):
processor_cfg = self.processors_cfgs[processor_idx]

if "input_manifest_file" not in processor_cfg:
if processor_idx == 0: ## ToDo
return

if not ("output" in self.processors_cfgs[processor_idx - 1] and
isinstance(self.processors_cfgs[processor_idx - 1]["output"], Manifest)):
raise ValueError()

def set_processor_manifests(self, processor_idx: int):
self.is_manifest_resolvable(processor_idx)

processor_cfg = self.processors_cfgs[processor_idx]

if "input_manifest_file" in processor_cfg:
input_manifest = Manifest(processor_cfg.pop("input_manifest_file"))
else:
#1 st processor
if processor_idx == 0:
input_manifest = None ##ToDo
else:
input_manifest = self.processors_cfgs[processor_idx - 1]["output"]

processor_cfg["input"] = input_manifest

if "output_manifest_file" in processor_cfg:
output_manifest = Manifest(processor_cfg.pop("output_manifest_file"))
else:
output_manifest = Manifest()

processor_cfg["output"] = output_manifest

self.processors_cfgs[processor_idx] = processor_cfg
print(processor_idx, processor_cfg)


Loading
Loading