Skip to content

Remove unused Test and TestRecorder classes from test_internals_v2.py #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 1 addition & 158 deletions tests/unit/searchcommands/test_internals_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,26 @@
# License for the specific language governing permissions and limitations
# under the License.

import gzip
import io
import json
import os
import random
import sys

import pytest
from functools import wraps
from itertools import chain
from sys import float_info
from tempfile import mktemp
from time import time
from types import MethodType
from unittest import main, TestCase

from collections import OrderedDict
from collections import namedtuple, deque
from collections import namedtuple

from splunklib.searchcommands.internals import (
MetadataDecoder,
MetadataEncoder,
Recorder,
RecordWriterV2,
)
from splunklib.searchcommands import SearchMetric
from io import BytesIO
import pickle


# region Functions for producing random apps
Expand Down Expand Up @@ -299,154 +291,5 @@ def _load_chunks(self, ifile):
_package_path = os.path.dirname(os.path.abspath(__file__))


class TestRecorder:
def __init__(self, test_case):
self._test_case = test_case
self._output = None
self._recording = None
self._recording_part = None

def _not_implemented(self):
raise NotImplementedError(
"class {} is not in playback or record mode".format(
self.__class__.__name__
)
)

self.get = self.next_part = self.stop = MethodType(
_not_implemented, self, self.__class__
)

@property
def output(self):
return self._output

def playback(self, path):
with open(path, "rb") as f:
test_data = pickle.load(f)

self._output = BytesIO()
self._recording = test_data["inputs"]
self._recording_part = self._recording.popleft()

def get(self, method, *args, **kwargs):
return self._recording_part[method.__name__].popleft()

self.get = MethodType(get, self, self.__class__)

def next_part(self):
self._recording_part = self._recording.popleft()

self.next_part = MethodType(next_part, self, self.__class__)

def stop(self):
self._test_case.assertEqual(test_data["results"], self._output.getvalue())

self.stop = MethodType(stop, self, self.__class__)

def record(self, path):
self._output = BytesIO()
self._recording = deque()
self._recording_part = OrderedDict()
self._recording.append(self._recording_part)

def get(self, method, *args, **kwargs):
result = method(*args, **kwargs)
part = self._recording_part
key = method.__name__
try:
results = part[key]
except KeyError:
part[key] = results = deque()
results.append(result)
return result

self.get = MethodType(get, self, self.__class__)

def next_part(self):
part = OrderedDict()
self._recording_part = part
self._recording.append(part)

self.next_part = MethodType(next_part, self, self.__class__)

def stop(self):
with io.open(path, "wb") as f:
test = OrderedDict(
(("inputs", self._recording), ("results", self._output.getvalue()))
)
pickle.dump(test, f)

self.stop = MethodType(stop, self, self.__class__)


def recorded(method):
@wraps(method)
def _record(*args, **kwargs):
return args[0].recorder.get(method, *args, **kwargs)

return _record


class Test:
def __init__(self, fieldnames, data_generators):
TestCase.__init__(self)
self._data_generators = list(
chain((lambda: self._serial_number, time), data_generators)
)
self._fieldnames = list(chain(("_serial", "_time"), fieldnames))
self._recorder = TestRecorder(self)
self._serial_number = None

@property
@recorded
def fieldnames(self):
return self._fieldnames

@property
@recorded
def row(self):
return [data_generator.__call__() for data_generator in self._data_generators]

@property
def recorder(self):
return self._recorder

@property
def serial_number(self):
return self._serial_number

def playback(self):
self.recorder.playback(
os.path.join(TestInternals._package_path, "TestRecorder.recording")
)
self._run()
self.recorder.stop()

def record(self):
self.recorder.record(
os.path.join(TestInternals._package_path, "TestRecorder.recording")
)
self._run()
self.recorder.stop()

def runTest(self):
pass # We'll adopt the new test recording mechanism a little later

def _run(self):
writer = RecordWriterV2(self.recorder.output, maxresultrows=10)
write_record = writer.write_record
names = self.fieldnames

for self._serial_number in range(0, 31):
record = OrderedDict(list(zip(names, self.row)))
write_record(record)


# test = Test(['random_bytes', 'random_unicode'], [random_bytes, random_unicode])
# test.record()
# test.playback()


if __name__ == "__main__":
main()