Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
version: 2.1

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester

steps:
- checkout

- run:
name: "Setup virtual env and install tap-listrak"
command: |
python3 -m venv /usr/local/share/virtualenvs/tap-listrak
source /usr/local/share/virtualenvs/tap-listrak/bin/activate
pip install -U "pip==22.2.2" "setuptools==65.3.0"
pip install -e .[dev]

- run:
name: "JSON Validator"
command: |
stitch-validate-json tap_listrak/schemas/*.json

- run:
name: "Run Pylint"
command: |
source /usr/local/share/virtualenvs/tap-listrak/bin/activate
pip install pylint
pylint tap_listrak -d C,R,W

- run:
name: "Run Unit Tests with Pytest"
command: |
source /usr/local/share/virtualenvs/tap-listrak/bin/activate
pip install pytest coverage pytest-cov
pip install opentelemetry-api opentelemetry-sdk
mkdir -p test_output
pytest --cov=tap_listrak --cov-report html:htmlcov --junitxml=test_output/report.xml tests/unittests/

- store_test_results:
path: test_output

- store_artifacts:
path: htmlcov

workflows:
version: 2

commit:
jobs:
- build:
context: circleci-user

build_daily:
triggers:
- schedule:
cron: "0 0 * * *"
filters:
branches:
only:
- master
jobs:
- build:
context: circleci-user
16 changes: 8 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
from setuptools import setup
from setuptools import setup, find_packages

setup(
name="tap-listrak",
Expand All @@ -10,19 +10,19 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_listrak"],
install_requires=[
"singer-python==5.8.1",
"requests==2.20.0",
"zeep",
'backoff==1.8.0',
'pendulum==1.2.0',
"singer-python==6.1.1",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make below changes,

    packages=find_packages(),
    package_data = {
        "tap_listrak/schemas": ["*.json"]
    },

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

"requests==2.32.4",
"zeep==4.3.1",
'backoff==2.2.1',
'pendulum==3.1.0'
],
entry_points="""
[console_scripts]
tap-listrak=tap_listrak:main
""",
packages=["tap_listrak"],
packages=find_packages(),
package_data = {
"schemas": ["tap_listrak/schemas/*.json"]
"tap_listrak/schemas": ["*.json"]
},
include_package_data=True,
)
17 changes: 14 additions & 3 deletions tap_listrak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,21 @@ def discover(ctx):


def sync(ctx):
"""
Sync function updated:
Dynamically finds and calls corresponding stream sync functions
instead of only calling hardcoded sync_lists(ctx)
"""
for tap_stream_id in ctx.selected_stream_ids:
schemas.load_and_write_schema(tap_stream_id)
streams_.sync_lists(ctx)

if hasattr(streams_, f"sync_{tap_stream_id}"):
sync_fn = getattr(streams_, f"sync_{tap_stream_id}")
LOGGER.info(f"Syncing stream: {tap_stream_id}")
sync_fn(ctx)
else:
LOGGER.warning(f"No sync function found for stream: {tap_stream_id}")

ctx.write_state()


Expand All @@ -60,8 +72,7 @@ def main_impl():
ctx = Context(args.config, args.state)
if args.discover:
discover(ctx).dump()
print()
else:
elif args.catalog:
ctx.catalog = Catalog.from_dict(args.properties) \
if args.properties else discover(ctx)
sync(ctx)
Expand Down
50 changes: 31 additions & 19 deletions tap_listrak/http.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import zeep
import sys
import singer
from singer import metrics
from zeep.exceptions import Fault
from zeep.exceptions import Fault, TransportError, XMLSyntaxError
import backoff

LOGGER = singer.get_logger()
Expand All @@ -17,23 +16,36 @@ def get_client(config):
return client

def log_retry_attempt(details):
_, exception, _ = sys.exc_info()
LOGGER.info(exception)
LOGGER.info('Caught retryable error after %s tries. Message: %s. Waiting %s more seconds then retrying...',
details["tries"],
exception.message,
details["wait"])
"""Log details about a backoff retry attempt."""
exception = details.get("exception")
LOGGER.warning(
"Retry attempt %s due to error: %s. Waiting %s more seconds before retrying...",
details["tries"],
str(exception),
details["wait"]
)

def is_non_retriable_exception(exc):
"""Avoid retrying on InvalidLogonAttempt errors."""
return isinstance(exc, Fault) and "InvalidLogonAttempt" in str(exc)

@backoff.on_exception(
backoff.expo,
(XMLSyntaxError, TransportError, Fault),
max_tries=5,
jitter=None,
on_backoff=log_retry_attempt,
giveup=is_non_retriable_exception
)
def request(tap_stream_id, service_fn, **kwargs):
"""Make SOAP API request with retry, metrics, and centralized error logging."""
with metrics.http_request_timer(tap_stream_id) as timer:
try:
response = service_fn(**kwargs)
timer.tags[metrics.Tag.http_status_code] = 200
LOGGER.info("Making request for message %s page %s with start date: %s",
kwargs.get('MsgID'), kwargs.get('Page'), kwargs.get('StartDate'))
return response
except Fault as e:
if "404" in str(e.detail):
LOGGER.info("Encountered a 404 for message: %s", kwargs['MsgID'])
return None
raise
response = service_fn(**kwargs)
timer.tags[metrics.Tag.http_status_code] = 200
LOGGER.info(
"Request successful for stream: %s | Page: %s | Start: %s",
tap_stream_id,
kwargs.get('Page', 'N/A'),
kwargs.get('StartDate', 'N/A')
)
return response
109 changes: 109 additions & 0 deletions tests/unittests/test_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pytest
from unittest.mock import MagicMock, patch
from zeep.exceptions import XMLSyntaxError, Fault, TransportError
from tap_listrak.http import request

MAX_RETRIES = 5


@pytest.fixture
def mock_http_timer():
"""Patch the http_request_timer context manager."""
with patch("tap_listrak.http.metrics.http_request_timer") as mock_timer:
mock_context = MagicMock()
mock_timer.return_value.__enter__.return_value = mock_context
yield mock_timer


def test_successful_request(mock_http_timer):
"""Test a successful SOAP request returns the expected result."""
def service_fn(**kwargs):
return "Success"

result = request("test_stream", service_fn)
assert result == "Success"
assert mock_http_timer.call_count == 1


def test_xml_syntax_error_retry(mock_http_timer):
"""Test that XMLSyntaxError triggers retries up to MAX_RETRIES."""
failing_mock = MagicMock(side_effect=XMLSyntaxError("Simulated XML error"))

with pytest.raises(XMLSyntaxError):
request("test_stream", failing_mock)

assert failing_mock.call_count == MAX_RETRIES
assert mock_http_timer.call_count == MAX_RETRIES


def test_fault_error_retry(mock_http_timer):
"""Test that Fault exception triggers retries up to MAX_RETRIES."""
failing_mock = MagicMock(side_effect=Fault("Simulated Fault"))

with pytest.raises(Fault):
request("test_stream", failing_mock)

assert failing_mock.call_count == MAX_RETRIES
assert mock_http_timer.call_count == MAX_RETRIES


def test_transport_error_retry(mock_http_timer):
"""Test that TransportError triggers retries up to MAX_RETRIES."""
failing_mock = MagicMock(side_effect=TransportError(502, "Bad Gateway"))

with pytest.raises(TransportError):
request("test_stream", failing_mock)

assert failing_mock.call_count == MAX_RETRIES
assert mock_http_timer.call_count == MAX_RETRIES


def test_retry_recovers_before_max_attempts(mock_http_timer):
"""Test that a request recovers after a few retries before reaching max."""
service_mock = MagicMock()
service_mock.side_effect = [XMLSyntaxError("fail"), "Recovered"]

result = request("test_stream", service_mock)

assert result == "Recovered"
assert service_mock.call_count == 2
assert mock_http_timer.call_count == 2


def test_unhandled_exception_not_retried(mock_http_timer):
"""Test that unexpected exceptions are not retried."""
service_mock = MagicMock(side_effect=ValueError("unexpected"))

with pytest.raises(ValueError):
request("test_stream", service_mock)

assert service_mock.call_count == 1
assert mock_http_timer.call_count == 1


def test_none_as_service_fn(mock_http_timer):
"""Test that passing None instead of a function raises TypeError."""
with pytest.raises(TypeError):
request("test_stream", None)


def test_service_fn_returns_none(mock_http_timer):
"""Test that a service function returning None is handled properly."""
def service_fn(**kwargs):
return None

result = request("test_stream", service_fn)
assert result is None
assert mock_http_timer.call_count == 1


def test_invalid_logon_attempt_not_retried(mock_http_timer):
"""Ensure InvalidLogonAttempt Fault is not retried and raised immediately."""
failing_mock = MagicMock(side_effect=Fault("InvalidLogonAttempt"))

with pytest.raises(Fault) as exc_info:
request("test_stream", failing_mock)

assert "InvalidLogonAttempt" in str(exc_info.value)
assert failing_mock.call_count == 1
assert mock_http_timer.call_count == 1