Skip to content

Add functionality for export of latency logs via telemetry #608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 72 commits into
base: telemetry
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
65a75f4
added functionality for export of failure logs
saishreeeee Jun 10, 2025
5305308
changed logger.error to logger.debug in exc.py
saishreeeee Jun 11, 2025
ba83c33
Fix telemetry loss during Python shutdown
saishreeeee Jun 11, 2025
131db92
unit tests for export_failure_log
saishreeeee Jun 12, 2025
3abc40d
try-catch blocks to make telemetry failures non-blocking for connecto…
saishreeeee Jun 12, 2025
ffa4787
removed redundant try/catch blocks, added try/catch block to initiali…
saishreeeee Jun 12, 2025
cc077f3
skip null fields in telemetry request
saishreeeee Jun 12, 2025
2c6fd44
removed dup import, renamed func, changed a filter_null_values to lamda
saishreeeee Jun 12, 2025
89540a1
removed unnecassary class variable and a redundant try/except block
saishreeeee Jun 12, 2025
52a1152
public functions defined at interface level
saishreeeee Jun 12, 2025
3dcdcfa
changed export_event and flush to private functions
saishreeeee Jun 13, 2025
b2714c9
formatting
saishreeeee Jun 13, 2025
377a87b
changed connection_uuid to thread local in thrift backend
saishreeeee Jun 13, 2025
c9376b8
made errors more specific
saishreeeee Jun 13, 2025
bbfadf2
revert change to connection_uuid
saishreeeee Jun 13, 2025
9bce26b
reverting change in close in telemetry client
saishreeeee Jun 13, 2025
ef4514d
JsonSerializableMixin
saishreeeee Jun 13, 2025
8924835
isdataclass check in JsonSerializableMixin
saishreeeee Jun 13, 2025
65361e7
convert TelemetryClientFactory to module-level functions, replace Noo…
saishreeeee Jun 16, 2025
1722a77
renamed connection_uuid as session_id_hex
saishreeeee Jun 16, 2025
e841434
added NotImplementedError to abstract class, added unit tests
saishreeeee Jun 16, 2025
2f89266
formatting
saishreeeee Jun 16, 2025
5564bbb
added PEP-249 link, changed NoopTelemetryClient implementation
saishreeeee Jun 17, 2025
1e4e8cf
removed unused import
saishreeeee Jun 17, 2025
55b29bc
made telemetry client close a module-level function
saishreeeee Jun 17, 2025
93bf170
unit tests verbose
saishreeeee Jun 17, 2025
45f5ccf
debug logs in unit tests
saishreeeee Jun 17, 2025
8ff1c1f
debug logs in unit tests
saishreeeee Jun 17, 2025
8bdd324
removed ABC from mixin, added try/catch block around executor shutdown
saishreeeee Jun 17, 2025
f99f7ea
checking stuff
saishreeeee Jun 17, 2025
b972c8a
finding out
saishreeeee Jun 17, 2025
7ca3636
finding out more
saishreeeee Jun 17, 2025
0ac8ed2
more more finding out more nice
saishreeeee Jun 17, 2025
c457a09
locks are useless anyways
saishreeeee Jun 17, 2025
5f07a84
haha
saishreeeee Jun 17, 2025
1115e25
normal
saishreeeee Jun 17, 2025
de1ed87
:= looks like walrus horizontally
saishreeeee Jun 17, 2025
554aeaf
one more
saishreeeee Jun 17, 2025
fffac5f
walrus again
saishreeeee Jun 17, 2025
b77208a
old stuff without walrus seems to fail
saishreeeee Jun 17, 2025
733c288
manually do the walrussing
saishreeeee Jun 17, 2025
ca8b958
change 3.13t, v2
saishreeeee Jun 17, 2025
3eabac9
formatting, added walrus
saishreeeee Jun 17, 2025
fb9ef43
formatting
saishreeeee Jun 17, 2025
1e795aa
removed walrus, removed test before stalling test
saishreeeee Jun 17, 2025
2c293a5
changed order of stalling test
saishreeeee Jun 18, 2025
d237255
removed debugging, added TelemetryClientFactory
saishreeeee Jun 18, 2025
f101b19
remove more debugging
saishreeeee Jun 18, 2025
a094659
latency logs funcitionality
saishreeeee Jun 19, 2025
695a07d
merge
saishreeeee Jun 19, 2025
fc918d6
fixed type of return value in get_session_id_hex() in thrift backend
saishreeeee Jun 19, 2025
d7c75d7
debug on TelemetryClientFactory lock
saishreeeee Jun 19, 2025
b6b0f89
formatting
saishreeeee Jun 19, 2025
50a1206
type notation for _waiters
saishreeeee Jun 19, 2025
39a0530
called connection.close() in test_arraysize_buffer_size_passthrough
saishreeeee Jun 19, 2025
413427f
run all unit tests
saishreeeee Jun 19, 2025
6b1d1b8
more debugging
saishreeeee Jun 19, 2025
8f5e5ba
removed the connection.close() from that test, put debug statement be…
saishreeeee Jun 19, 2025
2dc00ba
more debug
saishreeeee Jun 19, 2025
1ff03d4
more more more
saishreeeee Jun 19, 2025
6ff07c8
why
saishreeeee Jun 19, 2025
395049a
whywhy
saishreeeee Jun 19, 2025
4466821
thread name
saishreeeee Jun 19, 2025
34b63e4
added teardown to all tests except finalizer test (gc collect)
saishreeeee Jun 20, 2025
49082fb
added the get_attribute functions to the classes
saishreeeee Jun 20, 2025
ed1db9d
removed tearDown, added connection.close() to first test
saishreeeee Jun 20, 2025
9fa5a89
finally
saishreeeee Jun 21, 2025
14433c4
remove debugging
saishreeeee Jun 22, 2025
ef4ca13
added test for export_latency_log, made mock of thrift backend with r…
saishreeeee Jun 23, 2025
152e0da
Merge branch 'telemetry' into PECOBLR-554
saishreeeee Jun 23, 2025
b5bf165
added multi threaded tests
saishreeeee Jun 23, 2025
307a8cc
formatting
saishreeeee Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
transform_paramstyle,
ColumnTable,
ColumnQueue,
ArrowQueue,
CloudFetchQueue,
)
from databricks.sql.parameters.native import (
DbsqlParameterBase,
Expand Down Expand Up @@ -61,7 +63,8 @@
DriverConnectionParameters,
HostDetails,
)

from databricks.sql.telemetry.latency_logger import log_latency
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -758,6 +761,7 @@ def _handle_staging_operation(
session_id_hex=self.connection.get_session_id_hex(),
)

@log_latency()
def _handle_staging_put(
self, presigned_url: str, local_file: str, headers: Optional[dict] = None
):
Expand Down Expand Up @@ -797,6 +801,7 @@ def _handle_staging_put(
+ "but not yet applied on the server. It's possible this command may fail later."
)

@log_latency()
def _handle_staging_get(
self, local_file: str, presigned_url: str, headers: Optional[dict] = None
):
Expand Down Expand Up @@ -824,6 +829,7 @@ def _handle_staging_get(
with open(local_file, "wb") as fp:
fp.write(r.content)

@log_latency()
def _handle_staging_remove(
self, presigned_url: str, headers: Optional[dict] = None
):
Expand All @@ -837,6 +843,7 @@ def _handle_staging_remove(
session_id_hex=self.connection.get_session_id_hex(),
)

@log_latency()
def execute(
self,
operation: str,
Expand Down Expand Up @@ -927,6 +934,7 @@ def execute(

return self

@log_latency()
def execute_async(
self,
operation: str,
Expand Down Expand Up @@ -1052,6 +1060,7 @@ def executemany(self, operation, seq_of_parameters):
self.execute(operation, parameters)
return self

@log_latency()
def catalogs(self) -> "Cursor":
"""
Get all available catalogs.
Expand All @@ -1075,6 +1084,7 @@ def catalogs(self) -> "Cursor":
)
return self

@log_latency()
def schemas(
self, catalog_name: Optional[str] = None, schema_name: Optional[str] = None
) -> "Cursor":
Expand Down Expand Up @@ -1103,6 +1113,7 @@ def schemas(
)
return self

@log_latency()
def tables(
self,
catalog_name: Optional[str] = None,
Expand Down Expand Up @@ -1138,6 +1149,7 @@ def tables(
)
return self

@log_latency()
def columns(
self,
catalog_name: Optional[str] = None,
Expand Down Expand Up @@ -1342,6 +1354,39 @@ def setoutputsize(self, size, column=None):
"""Does nothing by default"""
pass

def get_statement_id(self) -> Optional[str]:
return self.query_id

def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()

def get_is_compressed(self) -> bool:
return self.connection.lz4_compression

def get_execution_result(self) -> ExecutionResultFormat:
if self.active_result_set is None:
return ExecutionResultFormat.FORMAT_UNSPECIFIED

if isinstance(self.active_result_set.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.active_result_set.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.active_result_set.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED

def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0

def get_statement_type(self, func_name: str) -> StatementType:
# TODO: Implement this
return StatementType.SQL


class ResultSet:
def __init__(
Expand Down Expand Up @@ -1406,6 +1451,7 @@ def _fill_results_buffer(self):
self.results = results
self.has_more_rows = has_more_rows

@log_latency()
def _convert_columnar_table(self, table):
column_names = [c[0] for c in self.description]
ResultRow = Row(*column_names)
Expand All @@ -1418,6 +1464,7 @@ def _convert_columnar_table(self, table):

return result

@log_latency()
def _convert_arrow_table(self, table):
column_names = [c[0] for c in self.description]
ResultRow = Row(*column_names)
Expand Down Expand Up @@ -1639,3 +1686,35 @@ def map_col_type(type_):
(column.name, map_col_type(column.datatype), None, None, None, None, None)
for column in table_schema_message.columns
]

def get_statement_id(self) -> Optional[str]:
if self.command_id:
return str(UUID(bytes=self.command_id.operationId.guid))
return None

def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()

def get_is_compressed(self) -> bool:
return self.lz4_compressed

def get_execution_result(self) -> ExecutionResultFormat:
if isinstance(self.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED

def get_statement_type(self, func_name: str) -> StatementType:
# TODO: Implement this
return StatementType.SQL

def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0
43 changes: 43 additions & 0 deletions src/databricks/sql/telemetry/latency_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time
import functools
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
from databricks.sql.telemetry.models.event import (
SqlExecutionEvent,
)


def log_latency():
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if something fails in this decorator? we should be handling that gracefully

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both the functions: get_telemetry_client and export_latency_log have try/catch blocks in them.

start_time = time.perf_counter()
result = None
try:
result = func(self, *args, **kwargs)
return result
finally:
end_time = time.perf_counter()
duration_ms = int((end_time - start_time) * 1000)

session_id_hex = self.get_session_id_hex()
statement_id = self.get_statement_id()

sql_exec_event = SqlExecutionEvent(
statement_type=self.get_statement_type(func.__name__),
is_compressed=self.get_is_compressed(),
execution_result=self.get_execution_result(),
retry_count=self.get_retry_count(),
)

telemetry_client = TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
telemetry_client.export_latency_log(
latency_ms=duration_ms,
sql_execution_event=sql_exec_event,
sql_statement_id=statement_id,
)

return wrapper

return decorator
47 changes: 43 additions & 4 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Optional
from typing import Dict, Optional, List
from databricks.sql.telemetry.models.event import (
TelemetryEvent,
DriverSystemConfiguration,
Expand Down Expand Up @@ -112,6 +112,10 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
def export_failure_log(self, error_name, error_message):
raise NotImplementedError("Subclasses must implement export_failure_log")

@abstractmethod
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
raise NotImplementedError("Subclasses must implement export_latency_log")

@abstractmethod
def close(self):
raise NotImplementedError("Subclasses must implement close")
Expand All @@ -136,6 +140,11 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
def export_failure_log(self, error_name, error_message):
pass

def export_latency_log(
self, latency_ms, sql_execution_event, sql_statement_id=None
):
pass

def close(self):
pass

Expand All @@ -149,6 +158,7 @@ class TelemetryClient(BaseTelemetryClient):
# Telemetry endpoint paths
TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext"
TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
DEFAULT_BATCH_SIZE = 10

def __init__(
self,
Expand All @@ -160,7 +170,7 @@ def __init__(
):
logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex)
self._telemetry_enabled = telemetry_enabled
self._batch_size = 10 # TODO: Decide on batch size
self._batch_size = self.DEFAULT_BATCH_SIZE # TODO: Decide on batch size
self._session_id_hex = session_id_hex
self._auth_provider = auth_provider
self._user_agent = None
Expand Down Expand Up @@ -299,6 +309,32 @@ def export_failure_log(self, error_name, error_message):
except Exception as e:
logger.debug("Failed to export failure log: %s", e)

def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
logger.debug("Exporting latency log for connection %s", self._session_id_hex)
try:
telemetry_frontend_log = TelemetryFrontendLog(
frontend_log_event_id=str(uuid.uuid4()),
context=FrontendLogContext(
client_context=TelemetryClientContext(
timestamp_millis=int(time.time() * 1000),
user_agent=self._user_agent,
)
),
entry=FrontendLogEntry(
sql_driver_log=TelemetryEvent(
session_id=self._session_id_hex,
system_configuration=TelemetryHelper.get_driver_system_configuration(),
driver_connection_params=self._driver_connection_params,
sql_statement_id=sql_statement_id,
sql_operation=sql_execution_event,
operation_latency_ms=latency_ms,
)
),
)
self._export_event(telemetry_frontend_log)
except Exception as e:
logger.debug("Failed to export latency log: %s", e)

def close(self):
"""Flush remaining events before closing"""
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
Expand Down Expand Up @@ -366,8 +402,8 @@ def initialize_telemetry_client(
host_url,
):
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
try:

try:
with TelemetryClientFactory._lock:
TelemetryClientFactory._initialize()

Expand Down Expand Up @@ -431,6 +467,9 @@ def close(session_id_hex):
logger.debug(
"No more telemetry clients, shutting down thread pool executor"
)
TelemetryClientFactory._executor.shutdown(wait=True)
try:
TelemetryClientFactory._executor.shutdown(wait=True)
except Exception as e:
logger.debug("Failed to shutdown thread pool executor: %s", e)
TelemetryClientFactory._executor = None
TelemetryClientFactory._initialized = False
22 changes: 18 additions & 4 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def new(cls):
cls.apply_property_to_mock(ThriftBackendMock, staging_allowed_local_path=None)
MockTExecuteStatementResp = MagicMock(spec=TExecuteStatementResp())

# Mock retry_policy with history attribute
mock_retry_policy = Mock()
mock_retry_policy.history = []
cls.apply_property_to_mock(ThriftBackendMock, retry_policy=mock_retry_policy)

cls.apply_property_to_mock(
MockTExecuteStatementResp,
description=None,
Expand Down Expand Up @@ -70,6 +75,15 @@ def apply_property_to_mock(self, mock_obj, **kwargs):
prop = PropertyMock(**kwargs)
setattr(type(mock_obj), key, prop)

@classmethod
def mock_thrift_backend_with_retry_policy(cls): # Required for log_latency() decorator
"""Create a simple thrift_backend mock with retry_policy for basic tests."""
mock_thrift_backend = Mock()
mock_retry_policy = Mock()
mock_retry_policy.history = []
mock_thrift_backend.retry_policy = mock_retry_policy
return mock_thrift_backend


class ClientTestSuite(unittest.TestCase):
"""
Expand Down Expand Up @@ -319,7 +333,7 @@ def test_executing_multiple_commands_uses_the_most_recent_command(
mock_result_sets[1].fetchall.assert_called_once_with()

def test_closed_cursor_doesnt_allow_operations(self):
cursor = client.Cursor(Mock(), Mock())
cursor = client.Cursor(Mock(), ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy())
cursor.close()

with self.assertRaises(Error) as e:
Expand Down Expand Up @@ -399,7 +413,7 @@ def test_get_schemas_parameters_passed_to_thrift_backend(self, mock_thrift_backe
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.schemas(**req_args)
Expand All @@ -422,7 +436,7 @@ def test_get_tables_parameters_passed_to_thrift_backend(self, mock_thrift_backen
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.tables(**req_args)
Expand All @@ -445,7 +459,7 @@ def test_get_columns_parameters_passed_to_thrift_backend(self, mock_thrift_backe
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.columns(**req_args)
Expand Down
Loading
Loading