Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ concurrency:
cancel-in-progress: true

jobs:
tests:
tests-full:
name: Full tests on Python ${{ matrix.python-version }}, ${{ matrix.os }}
strategy:
matrix:
# I tried running stuff on macOS but it was too slow and unreliable.
# I also tried windows runners but couldn't get Docker to work there, so I gave up.
os: [ubuntu-latest]
python-version: ['3.10', '3.11', '3.12']
env:
UV_SYSTEM_PYTHON: true
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -52,7 +55,7 @@ jobs:
- name: install uv
uses: astral-sh/setup-uv@v3
- name: Install pip dependencies
run: make deps-ci
run: make deps-full
- name: run tests (macOS)
if: matrix.os == 'macos-13'
run: make test-ci
Expand All @@ -64,3 +67,26 @@ jobs:
run: make test-ci
- name: check the formatting
run: make lint-ci

tests-minimal:
name: Minimal tests on Python ${{ matrix.python-version }}, ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest]
python-version: ['3.10', '3.11', '3.12']
runs-on: ${{ matrix.os }}
env:
UV_SYSTEM_PYTHON: true
steps:
- name: Acquire sources
uses: actions/checkout@v4
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Install project
run: make deps-minimal
- name: Invoke tests
run: make test-ci
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ lock-deps:
@uv pip compile requirements.in --quiet -o requirements.txt
@uv pip compile requirements.in --quiet -o requirements_arm64.txt --python-platform aarch64-unknown-linux-gnu

deps: lock-deps
deps: deps-full

deps-minimal: lock-deps
uv pip install -r requirements-dev.txt

deps-ci:
uv pip install --system -r requirements-dev.txt
deps-full: lock-deps
uv pip install -r requirements-dev-full.txt

test-ci:
set -a; source test.env; set +a; TESTCONTAINERS_RYUK_DISABLED=true pytest -n auto -x -rP -vv --tb=short --durations=10 --cov=ingestr --no-cov-on-fail
Expand Down
14 changes: 9 additions & 5 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import csv
import gzip
import importlib
import io
import json
import logging
Expand Down Expand Up @@ -545,16 +546,16 @@ def stop_fully(self):
mysqlDocker = DockerImage(
"mysql", lambda: MySqlContainer(MYSQL8_IMAGE, username="root").start()
)
msSqlServerDocker = DockerImage(
"sqlserver",
lambda: SqlServerContainer(MSSQL22_IMAGE, dialect="mssql").start(),
"?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=Yes",
)

SOURCES = {
"postgres": pgDocker,
"duckdb": EphemeralDuckDb(),
"mysql8": mysqlDocker,
"sqlserver": DockerImage(
"sqlserver",
lambda: SqlServerContainer(MSSQL22_IMAGE, dialect="mssql").start(),
"?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=Yes",
),
}

DESTINATIONS = {
Expand All @@ -563,6 +564,9 @@ def stop_fully(self):
"clickhouse+native": clickHouseDocker,
}

if importlib.util.find_spec("pyodbc"):
SOURCES["sqlserver"] = msSqlServerDocker


@pytest.fixture(scope="session", autouse=True)
def manage_containers(request):
Expand Down
94 changes: 2 additions & 92 deletions ingestr/src/destinations.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
import abc
import base64
import csv
import datetime
import json
import os
import shutil
import struct
import tempfile
from urllib.parse import parse_qs, quote, urlparse

import dlt
import dlt.destinations.impl.filesystem.filesystem
from dlt.common.configuration.specs import AwsCredentials
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema import Schema
from dlt.common.storages.configuration import FileSystemCredentials
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseCredentials,
)
from dlt.destinations.impl.mssql.configuration import MsSqlClientConfiguration
from dlt.destinations.impl.mssql.mssql import (
HINT_TO_MSSQL_ATTR,
MsSqlJobClient,
)
from dlt.destinations.impl.mssql.sql_client import (
PyOdbcMsSqlClient,
)

from ingestr.src.errors import MissingValueError
from ingestr.src.loader import load_dlt_file
Expand Down Expand Up @@ -155,88 +143,10 @@ def dlt_dest(self, uri: str, **kwargs):
return dlt.destinations.duckdb(uri, **kwargs)


def handle_datetimeoffset(dto_value: bytes) -> datetime.datetime:
# ref: https://github.com/mkleehammer/pyodbc/issues/134#issuecomment-281739794
tup = struct.unpack(
"<6hI2h", dto_value
) # e.g., (2017, 3, 16, 10, 35, 18, 500000000, -6, 0)
return datetime.datetime(
tup[0],
tup[1],
tup[2],
tup[3],
tup[4],
tup[5],
tup[6] // 1000,
datetime.timezone(datetime.timedelta(hours=tup[7], minutes=tup[8])),
)


class OdbcMsSqlClient(PyOdbcMsSqlClient):
SQL_COPT_SS_ACCESS_TOKEN = 1256
SKIP_CREDENTIALS = {"PWD", "AUTHENTICATION", "UID"}

def open_connection(self):
cfg = self.credentials._get_odbc_dsn_dict()
if (
cfg.get("AUTHENTICATION", "").strip().lower()
!= "activedirectoryaccesstoken"
):
return super().open_connection()

import pyodbc # type: ignore

dsn = ";".join(
[f"{k}={v}" for k, v in cfg.items() if k not in self.SKIP_CREDENTIALS]
)

self._conn = pyodbc.connect(
dsn,
timeout=self.credentials.connect_timeout,
attrs_before={
self.SQL_COPT_SS_ACCESS_TOKEN: self.serialize_token(cfg["PWD"]),
},
)

# https://github.com/mkleehammer/pyodbc/wiki/Using-an-Output-Converter-function
self._conn.add_output_converter(-155, handle_datetimeoffset)
self._conn.autocommit = True
return self._conn

def serialize_token(self, token):
# https://github.com/mkleehammer/pyodbc/issues/228#issuecomment-494773723
encoded = token.encode("utf_16_le")
return struct.pack("<i", len(encoded)) + encoded


class MsSqlClient(MsSqlJobClient):
def __init__(
self,
schema: Schema,
config: MsSqlClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = OdbcMsSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super(MsSqlJobClient, self).__init__(schema, config, sql_client)
self.config: MsSqlClientConfiguration = config
self.sql_client = sql_client
self.active_hints = HINT_TO_MSSQL_ATTR if self.config.create_indexes else {}
self.type_mapper = capabilities.get_type_mapper()


class MsSqlDestImpl(dlt.destinations.mssql):
@property
def client_class(self):
return MsSqlClient


class MsSQLDestination(GenericSqlDestination):
def dlt_dest(self, uri: str, **kwargs):
from ingestr.src.mssql import MsSqlDestImpl

return MsSqlDestImpl(credentials=uri, **kwargs)


Expand Down
9 changes: 6 additions & 3 deletions ingestr/src/destinations_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import json
import os
import unittest
Expand Down Expand Up @@ -103,9 +104,11 @@ class DuckDBDestinationTest(unittest.TestCase, GenericSqlDestinationFixture):
expected_class = dlt.destinations.duckdb


class MsSQLDestinationTest(unittest.TestCase, GenericSqlDestinationFixture):
destination = MsSQLDestination()
expected_class = dlt.destinations.mssql
if importlib.util.find_spec("pyodbc"):

class MsSQLDestinationTest(unittest.TestCase, GenericSqlDestinationFixture):
destination = MsSQLDestination()
expected_class = dlt.destinations.mssql


class DatabricksDestinationTest(unittest.TestCase, GenericSqlDestinationFixture):
Expand Down
9 changes: 9 additions & 0 deletions ingestr/src/mssql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import dlt

from ingestr.src.mssql.client import MsSqlClient


class MsSqlDestImpl(dlt.destinations.mssql):
@property
def client_class(self):
return MsSqlClient
70 changes: 70 additions & 0 deletions ingestr/src/mssql/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import struct

from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema import Schema
from dlt.destinations.impl.mssql.configuration import MsSqlClientConfiguration
from dlt.destinations.impl.mssql.mssql import (
HINT_TO_MSSQL_ATTR,
MsSqlJobClient,
)
from dlt.destinations.impl.mssql.sql_client import (
PyOdbcMsSqlClient,
handle_datetimeoffset,
)


class OdbcMsSqlClient(PyOdbcMsSqlClient):
SQL_COPT_SS_ACCESS_TOKEN = 1256
SKIP_CREDENTIALS = {"PWD", "AUTHENTICATION", "UID"}

def open_connection(self):
cfg = self.credentials._get_odbc_dsn_dict()
if (
cfg.get("AUTHENTICATION", "").strip().lower()
!= "activedirectoryaccesstoken"
):
return super().open_connection()

import pyodbc # type: ignore

dsn = ";".join(
[f"{k}={v}" for k, v in cfg.items() if k not in self.SKIP_CREDENTIALS]
)

self._conn = pyodbc.connect(
dsn,
timeout=self.credentials.connect_timeout,
attrs_before={
self.SQL_COPT_SS_ACCESS_TOKEN: self.serialize_token(cfg["PWD"]),
},
)

# https://github.com/mkleehammer/pyodbc/wiki/Using-an-Output-Converter-function
self._conn.add_output_converter(-155, handle_datetimeoffset)
self._conn.autocommit = True
return self._conn

def serialize_token(self, token):
# https://github.com/mkleehammer/pyodbc/issues/228#issuecomment-494773723
encoded = token.encode("utf_16_le")
return struct.pack("<i", len(encoded)) + encoded


class MsSqlClient(MsSqlJobClient):
def __init__(
self,
schema: Schema,
config: MsSqlClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = OdbcMsSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
super(MsSqlJobClient, self).__init__(schema, config, sql_client)
self.config: MsSqlClientConfiguration = config
self.sql_client = sql_client
self.active_hints = HINT_TO_MSSQL_ATTR if self.config.create_indexes else {}
self.type_mapper = capabilities.get_type_mapper()
2 changes: 1 addition & 1 deletion ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def table_rows(
import pyodbc # type: ignore
from sqlalchemy import create_engine

from ingestr.src.destinations import (
from ingestr.src.mssql.client import (
OdbcMsSqlClient,
handle_datetimeoffset,
)
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,14 @@ classifiers = [
]

[project.optional-dependencies]
full = [
"ingestr[oracle,odbc]",
]
oracle = [
"cx_Oracle==8.3.0",
]
odbc = [
"pyodbc==5.1.0",
"pyodbc==5.2.0",
]
Comment on lines 157 to 159
Copy link
Contributor Author

@amotl amotl Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requirements-dev.txt said 5.2.0 would be fine, so we used it without much ado.


[project.urls]
Expand Down
2 changes: 2 additions & 0 deletions requirements-dev-full.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r requirements-dev.txt
.[full]
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pytest==8.3.3
ruff==0.11.4
hatchling==1.27.0
build==1.2.1
pyodbc==5.2.0
twine==6.0.1
testcontainers[postgres,mysql]==4.8.2
pytest-xdist[psutil]==3.6.1
Comment on lines 6 to 11
Copy link
Contributor Author

@amotl amotl Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing pyodbc here is absolutely crucial so that its presence will not negatively mask any operations on the code base to make it work without.

Expand Down