Skip to content
Open
3 changes: 2 additions & 1 deletion .circleci/integration/target-config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"postgres_database": "target_postgres_test",
"postgres_username": "postgres"
"postgres_username": "postgres",
"logging_level": "DEBUG"
}
74 changes: 67 additions & 7 deletions target_postgres/singer_stream.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from copy import deepcopy
import math
import uuid

import arrow
from jsonschema import Draft4Validator, FormatChecker
from jsonschema.exceptions import ValidationError
import singer

from target_postgres import json_schema
from target_postgres.exceptions import SingerStreamError
from target_postgres.pysize import get_size

LOGGER = singer.get_logger()


SINGER_RECEIVED_AT = '_sdc_received_at'
SINGER_BATCHED_AT = '_sdc_batched_at'
Expand All @@ -19,6 +23,9 @@
SINGER_LEVEL = '_sdc_level_{}_id'
SINGER_VALUE = '_sdc_value'

DEFAULT__MAX_ROWS = 200000
DEFAULT__MAX_BUFFER_SIZE = 104857600 # 100MB


class BufferedSingerStream():
def __init__(self,
Expand All @@ -28,22 +35,25 @@ def __init__(self,
*args,
invalid_records_detect=None,
invalid_records_threshold=None,
max_rows=200000,
max_buffer_size=104857600, # 100MB
max_rows=DEFAULT__MAX_ROWS,
max_buffer_size=DEFAULT__MAX_BUFFER_SIZE,
**kwargs):
"""
:param invalid_records_detect: Defaults to True when value is None
:param invalid_records_threshold: Defaults to 0 when value is None
:param max_rows: Defaults to 200000 when value is Falsey
:param max_buffer_size: Defaults to 100MB when value if Falsey
"""

self.schema = None
self.key_properties = None
self.validator = None
self.update_schema(schema, key_properties)

self.stream = stream
self.invalid_records = []
self.max_rows = max_rows
self.max_buffer_size = max_buffer_size
self.max_rows = max_rows or DEFAULT__MAX_ROWS
self.max_buffer_size = max_buffer_size or DEFAULT__MAX_BUFFER_SIZE

self.invalid_records_detect = invalid_records_detect
self.invalid_records_threshold = invalid_records_threshold
Expand All @@ -58,6 +68,14 @@ def __init__(self,
self.__size = 0
self.__lifetime_max_version = None

self.__debug_reporting_interval = math.ceil(self.max_rows / 10.0)

LOGGER.debug('Stream `{}` created. `max_rows`: {} `max_buffer_size`: {}'.format(
self.stream,
self.max_rows,
self.max_buffer_size
))

def update_schema(self, schema, key_properties):
# In order to determine whether a value _is in_ properties _or not_ we need to flatten `$ref`s etc.
self.schema = json_schema.simplify(schema)
Expand Down Expand Up @@ -104,10 +122,22 @@ def count(self):
@property
def buffer_full(self):
if self.__count >= self.max_rows:
LOGGER.debug('Stream `{}` cutting batch due to row count being {:.2%} {}/{}'.format(
self.stream,
self.__count / self.max_rows,
self.__count,
self.max_rows
))
return True

if self.__count > 0:
if self.__size >= self.max_buffer_size:
LOGGER.debug('Stream `{}` cutting batch due to bytes being {:.2%} {}/{}'.format(
self.stream,
self.__size / self.max_buffer_size,
self.__size,
self.max_buffer_size
))
return True

return False
Expand All @@ -120,17 +150,43 @@ def __update_version(self, version):
if version is None or (self.__lifetime_max_version is not None and self.__lifetime_max_version >= version):
return None

## TODO: log warning about earlier records detected
if self.__count:
LOGGER.debug('WARNING: Stream `{}` dropping {} records due to version being updated from: `{}` to: `{}`'.format(
self.stream,
self.__count,
self.__lifetime_max_version,
version
))

self.flush_buffer()
self.__lifetime_max_version = version

def _debug_report_on_buffer_sizes(self):
if self.__count % self.__debug_reporting_interval == 0:
LOGGER.debug('Stream `{}` has {:.2%} {}/{} rows filled'.format(
self.stream,
self.__count / self.max_rows,
self.__count,
self.max_rows
))
LOGGER.debug('Stream `{}` has {:.2%} {}/{} bytes filled'.format(
self.stream,
self.__size / self.max_buffer_size,
self.__size,
self.max_buffer_size
))

def add_record_message(self, record_message):
add_record = True

self.__update_version(record_message.get('version'))

if self.__lifetime_max_version != record_message.get('version'):
LOGGER.debug('WARNING: Stream `{}` dropping record due to version mismatch. Expected: `{}`, Got: `{}`'.format(
self.stream,
self.__lifetime_max_version,
record_message.get('version')
))
return None

try:
Expand All @@ -150,6 +206,8 @@ def add_record_message(self, record_message):
self.invalid_records_threshold),
self.invalid_records)

self._debug_report_on_buffer_sizes()

def peek_buffer(self):
return self.__buffer

Expand Down Expand Up @@ -181,11 +239,13 @@ def get_batch(self):
return records

def flush_buffer(self):
_buffer = self.__buffer
LOGGER.debug('Stream `{}` flushing buffer...'.format(
self.stream
))

self.__buffer = []
self.__size = 0
self.__count = 0
return _buffer

def peek_invalid_records(self):
return self.invalid_records
58 changes: 44 additions & 14 deletions target_postgres/target_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def stream_to_target(stream, target, config={}):
state_tracker = StreamTracker(target, state_support)
_run_sql_hook('before_run_sql', config, target)

line_stats = {} # dict of <line_type: count>

try:
if not config.get('disable_collection', False):
_async_send_usage_stats()
Expand All @@ -52,20 +54,39 @@ def stream_to_target(stream, target, config={}):
max_batch_size = config.get('max_batch_size', 104857600) # 100MB
batch_detection_threshold = config.get('batch_detection_threshold', max(max_batch_rows / 40, 50))

LOGGER.info('Streaming to target with the following configuration: {}'.format(
{
'state_support': state_support,
'invalid_records_detect': invalid_records_detect,
'invalid_records_threshold': invalid_records_threshold,
'max_batch_rows': max_batch_rows,
'max_batch_size': max_batch_size,
'batch_detection_threshold': batch_detection_threshold
}))

line_count = 0
for line in stream:
_line_handler(state_tracker,
_line_handler(line_stats,
state_tracker,
target,
invalid_records_detect,
invalid_records_threshold,
max_batch_rows,
max_batch_size,
line
)
if line_count > 0 and line_count % batch_detection_threshold == 0:
state_tracker.flush_streams()

line_count += 1

if line_count % batch_detection_threshold == 0:
LOGGER.debug('Attempting to flush streams at `line_count` {}, with records distribution of: {}'.format(
line_count,
_records_distribution(line_count, line_stats)
))
state_tracker.flush_streams()

LOGGER.debug('Forcing flush of streams. Input depleted.')

state_tracker.flush_streams(force=True)
_run_sql_hook('after_run_sql', config, target)

Expand All @@ -78,6 +99,13 @@ def stream_to_target(stream, target, config={}):
_report_invalid_records(state_tracker.streams)


def _records_distribution(count, stats):
return { (k, '{:.2%} ({})'.format(
v / count,
v
)) for k, v in stats.items() }


def _report_invalid_records(streams):
for stream_buffer in streams.values():
if stream_buffer.peek_invalid_records():
Expand All @@ -87,7 +115,7 @@ def _report_invalid_records(streams):
))


def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows,
def _line_handler(line_stats, state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows,
max_batch_size, line):
try:
line_data = json.loads(line)
Expand All @@ -98,7 +126,9 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
if 'type' not in line_data:
raise TargetError('`type` is a required key: {}'.format(line))

if line_data['type'] == 'SCHEMA':
line_type = line_data['type']

if line_type == 'SCHEMA':
if 'stream' not in line_data:
raise TargetError('`stream` is a required key: {}'.format(line))

Expand All @@ -123,21 +153,19 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
schema,
key_properties,
invalid_records_detect=invalid_records_detect,
invalid_records_threshold=invalid_records_threshold)
if max_batch_rows:
buffered_stream.max_rows = max_batch_rows
if max_batch_size:
buffered_stream.max_buffer_size = max_batch_size
invalid_records_threshold=invalid_records_threshold,
max_rows=max_batch_rows,
max_buffer_size=max_batch_size)

state_tracker.register_stream(stream, buffered_stream)
else:
state_tracker.streams[stream].update_schema(schema, key_properties)
elif line_data['type'] == 'RECORD':
elif line_type == 'RECORD':
if 'stream' not in line_data:
raise TargetError('`stream` is a required key: {}'.format(line))

state_tracker.handle_record_message(line_data['stream'], line_data)
elif line_data['type'] == 'ACTIVATE_VERSION':
elif line_type == 'ACTIVATE_VERSION':
if 'stream' not in line_data:
raise TargetError('`stream` is a required key: {}'.format(line))
if 'version' not in line_data:
Expand All @@ -149,13 +177,15 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
stream_buffer = state_tracker.streams[line_data['stream']]
state_tracker.flush_stream(line_data['stream'])
target.activate_version(stream_buffer, line_data['version'])
elif line_data['type'] == 'STATE':
elif line_type == 'STATE':
state_tracker.handle_state_message(line_data)
else:
raise TargetError('Unknown message type {} in message {}'.format(
line_data['type'],
line_type,
line))

line_stats[line_type] = line_stats.get(line_type, 0) + 1


def _send_usage_stats():
try:
Expand Down