diff --git a/test_tools/log_parser/parser.py b/test_tools/log_parser/parser.py index bf90e8d0d..430417157 100755 --- a/test_tools/log_parser/parser.py +++ b/test_tools/log_parser/parser.py @@ -6,10 +6,14 @@ import json import struct import datetime +import argparse + PREAMBLE_LENGTH = 8 PRETTY_PRINT_JSON=False +print_short_message=False +print_file_summary=False def fmt_payload(payload): payload = bytearray(payload).decode('utf-8') @@ -129,9 +133,10 @@ def dump_event_batch(buf): meta = batch.Metadata() enc = meta.ContentEncoding().decode('utf-8') print(f'event-batch evt-count:{batch.EventsLength()} enc:{enc}') - is_dedup = b'DEDUP' == meta.ContentEncoding() - for i in range(0, batch.EventsLength()): - dump_event(batch.Events(i).PayloadAsNumpy(), i) + if not print_short_message: + is_dedup = b'DEDUP' == meta.ContentEncoding() + for i in range(0, batch.EventsLength()): + dump_event(batch.Events(i).PayloadAsNumpy(), i) print("----\n") @@ -141,6 +146,7 @@ def dump_preamble_file(file_name, buf): dump_event_batch(buf[PREAMBLE_LENGTH : PREAMBLE_LENGTH + preamble["msg_size"]]) MSG_TYPE_HEADER = 0x55555555 +MSG_TYPE_CHECKPOINT = 0x11111111 MSG_TYPE_REGULAR = 0xFFFFFFFF MSG_TYPE_EOF = 0xAAAAAAAA @@ -172,6 +178,7 @@ def read_message(self): kind = struct.unpack('I', self.read(4))[0] length = struct.unpack('I', self.read(4))[0] payload = self.read(length) + #discard padding self.read(length % 8) return (kind, payload) @@ -187,11 +194,38 @@ def read_file_header(self): self.headers[p.Key().decode('utf-8')] = p.Value().decode('utf-8') def messages(self): - while True: + while self.offset < len(self.buf): msg = self.read_message() if msg[0] == MSG_TYPE_EOF: + print('got eof') break - yield JoinedPayload.GetRootAsJoinedPayload(msg[1], 0) + yield (msg[0], msg[1]) + + +def print_checkpoint(msg, msg_count, msg_size): + if print_file_summary: + print(f'checkpoint len:{len(msg)} messages before:{msg_count} data before:{msg_size} bytes') + return + + if print_short_message: + print(f'checkpoint len: {len(msg)}') + return + + info = RewardFunctionInfo.GetRootAsRewardFunctionInfo(msg, 0) + print(f'checkpoint agg-type:{info.Type()} default-reward: {info.DefaultReward()}') + +def print_regular_msg(msg, reader): + if print_file_summary: + return + if print_short_message: + print(f'regular-msg len: {len(msg)} offset {reader.offset}') + return + + payload = JoinedPayload.GetRootAsJoinedPayload(msg, 0) + print(f'joined-batch events: {payload.EventsLength()}') + for i in range(payload.EventsLength()): + joined_event = payload.Events(i) + dump_event(joined_event.EventAsNumpy(), i, joined_event.Timestamp()) def dump_joined_log_file(file_name, buf): reader = JoinedLogStreamReader(buf) @@ -199,19 +233,33 @@ def dump_joined_log_file(file_name, buf): for k in reader.headers: print(f'\t{k} = {reader.headers[k]}') - for msg in reader.messages(): - print(f'joined-batch events: {msg.EventsLength()}') - for i in range(msg.EventsLength()): - joined_event = msg.Events(i) - dump_event(joined_event.EventAsNumpy(), i, joined_event.Timestamp()) + msg_count = 0 + msg_size = 0 -def dump_file(f): - buf = bytearray(open(f, 'rb').read()) + for msg in reader.messages(): + if msg[0] == MSG_TYPE_HEADER: + print('Header message found after first message. THIS IS A BUG') + elif msg[0] == MSG_TYPE_CHECKPOINT: + print_checkpoint(msg[1], msg_count, msg_size) + msg_count = 0 + msg_size = 0 + elif msg[0] == MSG_TYPE_REGULAR: + msg_count += 1 + msg_size += len(msg[1]) + print_regular_msg(msg[1], reader) + else: + print(f'invalid message type {msg[0]:X}') + if print_file_summary: + print(f'finished reading remaining messages: {msg_count} size: {msg_size} bytes') + print(f'offset is {reader.offset}') + +def dump_file(file_name): + buf = bytearray(open(file_name, 'rb').read()) if buf[0:4] == b'VWFB': - dump_joined_log_file(f, buf) + dump_joined_log_file(file_name, buf) else: - dump_preamble_file(f, buf) + dump_preamble_file(file_name, buf) # Generate FB serializers if they are not available @@ -249,5 +297,15 @@ def dump_file(f): from reinforcement_learning.messages.flatbuff.v2.JoinedPayload import * from reinforcement_learning.messages.flatbuff.v2.RewardFunctionInfo import * -for input_file in sys.argv[1:]: + +parser = argparse.ArgumentParser() +parser.add_argument('--short', help='Print a short description of each message', action="store_true") +parser.add_argument('--summary', help='Print a Summary of the file', action="store_true") +parser.add_argument('files', metavar='N', type=str, nargs='+', help='Files to parse') +vm = parser.parse_args() + +print_short_message=vm.short +print_file_summary=vm.summary + +for input_file in vm.files: dump_file(input_file)