Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 73 additions & 15 deletions test_tools/log_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import json
import struct
import datetime
import argparse


PREAMBLE_LENGTH = 8
PRETTY_PRINT_JSON=False

print_short_message=False
print_file_summary=False

def fmt_payload(payload):
payload = bytearray(payload).decode('utf-8')
Expand Down Expand Up @@ -129,9 +133,10 @@ def dump_event_batch(buf):
meta = batch.Metadata()
enc = meta.ContentEncoding().decode('utf-8')
print(f'event-batch evt-count:{batch.EventsLength()} enc:{enc}')
is_dedup = b'DEDUP' == meta.ContentEncoding()
for i in range(0, batch.EventsLength()):
dump_event(batch.Events(i).PayloadAsNumpy(), i)
if not print_short_message:
is_dedup = b'DEDUP' == meta.ContentEncoding()
for i in range(0, batch.EventsLength()):
dump_event(batch.Events(i).PayloadAsNumpy(), i)
print("----\n")


Expand All @@ -141,6 +146,7 @@ def dump_preamble_file(file_name, buf):
dump_event_batch(buf[PREAMBLE_LENGTH : PREAMBLE_LENGTH + preamble["msg_size"]])

MSG_TYPE_HEADER = 0x55555555
MSG_TYPE_CHECKPOINT = 0x11111111
MSG_TYPE_REGULAR = 0xFFFFFFFF
MSG_TYPE_EOF = 0xAAAAAAAA

Expand Down Expand Up @@ -172,6 +178,7 @@ def read_message(self):
kind = struct.unpack('I', self.read(4))[0]
length = struct.unpack('I', self.read(4))[0]
payload = self.read(length)

#discard padding
self.read(length % 8)
return (kind, payload)
Expand All @@ -187,31 +194,72 @@ def read_file_header(self):
self.headers[p.Key().decode('utf-8')] = p.Value().decode('utf-8')

def messages(self):
while True:
while self.offset < len(self.buf):
msg = self.read_message()
if msg[0] == MSG_TYPE_EOF:
print('got eof')
break
yield JoinedPayload.GetRootAsJoinedPayload(msg[1], 0)
yield (msg[0], msg[1])


def print_checkpoint(msg, msg_count, msg_size):
if print_file_summary:
print(f'checkpoint len:{len(msg)} messages before:{msg_count} data before:{msg_size} bytes')
return

if print_short_message:
print(f'checkpoint len: {len(msg)}')
return

info = RewardFunctionInfo.GetRootAsRewardFunctionInfo(msg, 0)
print(f'checkpoint agg-type:{info.Type()} default-reward: {info.DefaultReward()}')

def print_regular_msg(msg, reader):
if print_file_summary:
return
if print_short_message:
print(f'regular-msg len: {len(msg)} offset {reader.offset}')
return

payload = JoinedPayload.GetRootAsJoinedPayload(msg, 0)
print(f'joined-batch events: {payload.EventsLength()}')
for i in range(payload.EventsLength()):
joined_event = payload.Events(i)
dump_event(joined_event.EventAsNumpy(), i, joined_event.Timestamp())

def dump_joined_log_file(file_name, buf):
reader = JoinedLogStreamReader(buf)
print(f'parsing joined log:{file_name} header:')
for k in reader.headers:
print(f'\t{k} = {reader.headers[k]}')

for msg in reader.messages():
print(f'joined-batch events: {msg.EventsLength()}')
for i in range(msg.EventsLength()):
joined_event = msg.Events(i)
dump_event(joined_event.EventAsNumpy(), i, joined_event.Timestamp())
msg_count = 0
msg_size = 0

def dump_file(f):
buf = bytearray(open(f, 'rb').read())
for msg in reader.messages():
if msg[0] == MSG_TYPE_HEADER:
print('Header message found after first message. THIS IS A BUG')
elif msg[0] == MSG_TYPE_CHECKPOINT:
print_checkpoint(msg[1], msg_count, msg_size)
msg_count = 0
msg_size = 0
elif msg[0] == MSG_TYPE_REGULAR:
msg_count += 1
msg_size += len(msg[1])
print_regular_msg(msg[1], reader)
else:
print(f'invalid message type {msg[0]:X}')
if print_file_summary:
print(f'finished reading remaining messages: {msg_count} size: {msg_size} bytes')
print(f'offset is {reader.offset}')

def dump_file(file_name):
buf = bytearray(open(file_name, 'rb').read())

if buf[0:4] == b'VWFB':
dump_joined_log_file(f, buf)
dump_joined_log_file(file_name, buf)
else:
dump_preamble_file(f, buf)
dump_preamble_file(file_name, buf)


# Generate FB serializers if they are not available
Expand Down Expand Up @@ -249,5 +297,15 @@ def dump_file(f):
from reinforcement_learning.messages.flatbuff.v2.JoinedPayload import *
from reinforcement_learning.messages.flatbuff.v2.RewardFunctionInfo import *

for input_file in sys.argv[1:]:

parser = argparse.ArgumentParser()
parser.add_argument('--short', help='Print a short description of each message', action="store_true")
parser.add_argument('--summary', help='Print a Summary of the file', action="store_true")
parser.add_argument('files', metavar='N', type=str, nargs='+', help='Files to parse')
vm = parser.parse_args()

print_short_message=vm.short
print_file_summary=vm.summary

for input_file in vm.files:
dump_file(input_file)