diff --git a/tests/unit/searchcommands/test_internals_v2.py b/tests/unit/searchcommands/test_internals_v2.py index 0a935b9e..97dfefd3 100755 --- a/tests/unit/searchcommands/test_internals_v2.py +++ b/tests/unit/searchcommands/test_internals_v2.py @@ -15,34 +15,26 @@ # License for the specific language governing permissions and limitations # under the License. -import gzip -import io import json import os import random import sys import pytest -from functools import wraps -from itertools import chain from sys import float_info -from tempfile import mktemp from time import time -from types import MethodType from unittest import main, TestCase from collections import OrderedDict -from collections import namedtuple, deque +from collections import namedtuple from splunklib.searchcommands.internals import ( MetadataDecoder, MetadataEncoder, - Recorder, RecordWriterV2, ) from splunklib.searchcommands import SearchMetric from io import BytesIO -import pickle # region Functions for producing random apps @@ -299,154 +291,5 @@ def _load_chunks(self, ifile): _package_path = os.path.dirname(os.path.abspath(__file__)) -class TestRecorder: - def __init__(self, test_case): - self._test_case = test_case - self._output = None - self._recording = None - self._recording_part = None - - def _not_implemented(self): - raise NotImplementedError( - "class {} is not in playback or record mode".format( - self.__class__.__name__ - ) - ) - - self.get = self.next_part = self.stop = MethodType( - _not_implemented, self, self.__class__ - ) - - @property - def output(self): - return self._output - - def playback(self, path): - with open(path, "rb") as f: - test_data = pickle.load(f) - - self._output = BytesIO() - self._recording = test_data["inputs"] - self._recording_part = self._recording.popleft() - - def get(self, method, *args, **kwargs): - return self._recording_part[method.__name__].popleft() - - self.get = MethodType(get, self, self.__class__) - - def next_part(self): - self._recording_part = self._recording.popleft() - - self.next_part = MethodType(next_part, self, self.__class__) - - def stop(self): - self._test_case.assertEqual(test_data["results"], self._output.getvalue()) - - self.stop = MethodType(stop, self, self.__class__) - - def record(self, path): - self._output = BytesIO() - self._recording = deque() - self._recording_part = OrderedDict() - self._recording.append(self._recording_part) - - def get(self, method, *args, **kwargs): - result = method(*args, **kwargs) - part = self._recording_part - key = method.__name__ - try: - results = part[key] - except KeyError: - part[key] = results = deque() - results.append(result) - return result - - self.get = MethodType(get, self, self.__class__) - - def next_part(self): - part = OrderedDict() - self._recording_part = part - self._recording.append(part) - - self.next_part = MethodType(next_part, self, self.__class__) - - def stop(self): - with io.open(path, "wb") as f: - test = OrderedDict( - (("inputs", self._recording), ("results", self._output.getvalue())) - ) - pickle.dump(test, f) - - self.stop = MethodType(stop, self, self.__class__) - - -def recorded(method): - @wraps(method) - def _record(*args, **kwargs): - return args[0].recorder.get(method, *args, **kwargs) - - return _record - - -class Test: - def __init__(self, fieldnames, data_generators): - TestCase.__init__(self) - self._data_generators = list( - chain((lambda: self._serial_number, time), data_generators) - ) - self._fieldnames = list(chain(("_serial", "_time"), fieldnames)) - self._recorder = TestRecorder(self) - self._serial_number = None - - @property - @recorded - def fieldnames(self): - return self._fieldnames - - @property - @recorded - def row(self): - return [data_generator.__call__() for data_generator in self._data_generators] - - @property - def recorder(self): - return self._recorder - - @property - def serial_number(self): - return self._serial_number - - def playback(self): - self.recorder.playback( - os.path.join(TestInternals._package_path, "TestRecorder.recording") - ) - self._run() - self.recorder.stop() - - def record(self): - self.recorder.record( - os.path.join(TestInternals._package_path, "TestRecorder.recording") - ) - self._run() - self.recorder.stop() - - def runTest(self): - pass # We'll adopt the new test recording mechanism a little later - - def _run(self): - writer = RecordWriterV2(self.recorder.output, maxresultrows=10) - write_record = writer.write_record - names = self.fieldnames - - for self._serial_number in range(0, 31): - record = OrderedDict(list(zip(names, self.row))) - write_record(record) - - -# test = Test(['random_bytes', 'random_unicode'], [random_bytes, random_unicode]) -# test.record() -# test.playback() - - if __name__ == "__main__": main()