Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a46cd16
[Confluence] Improve error handling in Confluence pagination
parth-elastic Jun 3, 2024
d9bc3d6
Introduce threshold constants for error triggering
parth-elastic Jun 5, 2024
55074f8
Addressed comments
parth-elastic Jun 10, 2024
8d16015
Modify Logger and Fix pipeline
parth-elastic Jun 10, 2024
615f5d9
MIN_API_CALL for error threshold handling
parth-elastic Jun 13, 2024
2841c50
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Jun 18, 2024
77957b4
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Jul 15, 2024
32bb417
expanded error handling
parth-elastic Jul 15, 2024
475157d
resolved conflict
parth-elastic Jul 18, 2024
aa48872
revert changes
parth-elastic Jul 18, 2024
4bdd8a3
revert change
parth-elastic Jul 18, 2024
2298450
resolve conflicts
parth-elastic Aug 8, 2024
e896eef
resloved conflicts
parth-elastic Aug 8, 2024
f2e2e36
revert changes
parth-elastic Aug 8, 2024
1dda55f
revert deleted files
parth-elastic Aug 8, 2024
c41b86e
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Aug 13, 2024
1e63d05
increase queue size
parth-elastic Aug 13, 2024
b2eb195
Delete lib64
artem-shelkovnikov Sep 9, 2024
f4f0ee5
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Sep 18, 2024
d57fee3
Remove redundant Exception
parth-elastic Sep 18, 2024
dababfd
Merge branch 'confluence-pagination-swallow-error' of github.com:elas…
parth-elastic Sep 18, 2024
9c837c1
fix lint
parth-elastic Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 123 additions & 20 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from urllib.parse import urljoin

import aiohttp
from aiohttp.client_exceptions import ClientResponseError, ServerDisconnectedError
from aiohttp.client_exceptions import (
ClientPayloadError,
ClientResponseError,
ServerConnectionError,
)

from connectors.access_control import ACCESS_CONTROL
from connectors.logger import logger
Expand Down Expand Up @@ -42,6 +46,8 @@
RETRIES = 3
RETRY_INTERVAL = 2
DEFAULT_RETRY_SECONDS = 30
API_FAILURE_THRESHOLD = 0.10
MIN_API_CALL = 1000

LIMIT = 50
SPACE = "space"
Expand Down Expand Up @@ -104,6 +110,22 @@ class NotFound(Exception):
pass


class SyncFailure(Exception):
pass


class UnauthorizedException(Exception):
pass


class BadRequestError(Exception):
pass


class Forbidden(Exception):
pass


class ConfluenceClient:
"""Confluence client to handle API calls made to Confluence"""

Expand All @@ -117,6 +139,8 @@ def __init__(self, configuration):
self.certificate = self.configuration["ssl_ca"]
self.retry_count = self.configuration["retry_count"]
self.index_labels = self.configuration["index_labels"]
self.api_total_count = 0
self.api_failed_count = 0
if self.data_source_type == CONFLUENCE_CLOUD:
self.host_url = os.path.join(self.host_url, "wiki")

Expand Down Expand Up @@ -195,6 +219,17 @@ async def _handle_client_errors(self, url, exception):

await self._sleeps.sleep(retry_seconds)
raise ThrottledError
elif exception.status == 400:
self._logger.error(f"Bad Request Error (400). Exception: {exception}.")
raise BadRequestError
elif exception.status == 401:
self._logger.error(f"Authentication error (401). Exception: {exception}.")
raise UnauthorizedException
elif exception.status == 403:
self._logger.error(
f"Invalid credentials provided for the request to url: {url}. Exception: {exception}"
)
raise Forbidden
elif exception.status == 404:
self._logger.error(f"Getting Not Found Error for url: {url}")
raise NotFound
Expand All @@ -204,6 +239,39 @@ async def _handle_client_errors(self, url, exception):
else:
raise

async def _handle_client_payload_error(self, exception):
retry_seconds = DEFAULT_RETRY_SECONDS
response_headers = exception.headers or {}
if "Retry-After" in response_headers:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about client payload errors makes it special and the only thing that should look at a retry-after header?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client payload errors often include a Retry-After header to manage rate limits and server load, indicating how long to wait before retrying, similar to how we handle it for OneDrive and SharePoint Online.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why not other error types too? Like 429 response codes should pretty much always include a Retry-After header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, 429 response codes should also include a Retry-After header. However, we already handle this logic in the _handle_client_errors(), which checks for the Retry-After header for 429 response code

try:
retry_seconds = int(response_headers["Retry-After"])
except (TypeError, ValueError) as exception:
self._logger.error(
f"Error while reading value of retry-after header {exception}. Using default retry time: {DEFAULT_RETRY_SECONDS} seconds"
)
await self._sleeps.sleep(retry_seconds)
raise

async def _handle_api_call_error(self, url, exception):
known_errors = {
ServerConnectionError,
ClientResponseError,
ClientPayloadError,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in the list if there's a _handle_client_payload_errror() function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To re-raised the exception if error is not handled by _handle_client_payload_errror()

Forbidden,
UnauthorizedException,
ThrottledError,
NotFound,
InternalServerError,
}

if isinstance(exception, tuple(known_errors)):
# Re-raising the error as it is not handled explicitly by retryable
raise
else:
self._logger.error(
f"Error encountered and the data for {url} skipped. Exception: {exception} caused the pagination to end prematurely."
)

@retryable(
retries=RETRIES,
interval=RETRY_INTERVAL,
Expand All @@ -223,17 +291,26 @@ async def api_call(self, url):
response: Client response
"""
self._logger.debug(f"Making a GET call for url: {url}")

if url != os.path.join(self.host_url, PING_URL):
self.api_total_count += 1
try:
return await self._get_session().get(
url=url,
ssl=self.ssl_ctx,
)
except ServerDisconnectedError:
except ServerConnectionError:
await self.close_session()
raise
except ClientResponseError as exception:
await self._handle_client_errors(url=url, exception=exception)
except ClientPayloadError as exception:
await self._handle_client_payload_error(exception)
except Exception as exception:
self.api_failed_count += 1
self._logger.debug(
f"Incrementing error count due to exception: {exception}. Failed API count: {self.api_failed_count}"
)
raise

async def paginated_api_call(self, url_name, **url_kwargs):
"""Make a paginated API call for Confluence objects using the passed url_name.
Expand All @@ -259,9 +336,7 @@ async def paginated_api_call(self, url_name, **url_kwargs):
links.get("next")[1:],
)
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
)
await self._handle_api_call_error(url, exception)
break

async def paginated_api_call_for_datacenter_syncrule(self, url_name, **url_kwargs):
Expand All @@ -288,13 +363,14 @@ async def paginated_api_call_for_datacenter_syncrule(self, url_name, **url_kwarg
if len(json_response.get("results", [])) < LIMIT:
break
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
)
await self._handle_api_call_error(url, exception)
break

async def download_func(self, url):
yield await self.api_call(url)
try:
yield await self.api_call(url)
except Exception as exception:
await self._handle_api_call_error(url, exception)

async def search_by_query(self, query):
if self.data_source_type == CONFLUENCE_DATA_CENTER:
Expand Down Expand Up @@ -332,6 +408,8 @@ async def fetch_server_space_permission(self, url):
f"Something went wrong. Make sure you have installed Extender for running confluence datacenter/server DLS. Exception: {exception}."
)
return {}
except Exception as exception:
await self._handle_api_call_error(url, exception)

async def fetch_page_blog_documents(self, api_query):
async for response in self.paginated_api_call(
Expand Down Expand Up @@ -377,19 +455,27 @@ async def fetch_confluence_server_users(self):
url = urljoin(self.host_url, URLS[USERS_FOR_SERVER])

while True:
url_ = url.format(start=start_at, limit=limit)
users = await self.api_call(url=url_)
response = await users.json()
if len(response.get(key)) == 0:
return
yield response.get(key)
start_at += limit
try:
url_ = url.format(start=start_at, limit=limit)
users = await self.api_call(url=url_)
response = await users.json()
if len(response.get(key)) == 0:
return
yield response.get(key)
start_at += limit
except Exception as exception:
await self._handle_api_call_error(url, exception)
break

async def fetch_label(self, label_id):
url = os.path.join(self.host_url, URLS[LABEL].format(id=label_id))
label_data = await self.api_call(url=url)
labels = await label_data.json()
return [label.get("name") for label in labels["results"]]
try:
label_data = await self.api_call(url=url)
labels = await label_data.json()
return [label.get("name") for label in labels["results"]]

except Exception as exception:
await self._handle_api_call_error(url, exception)


class ConfluenceDataSource(BaseDataSource):
Expand Down Expand Up @@ -1224,6 +1310,20 @@ async def _consumer(self):
else:
yield item

def check_api_exceptions_and_raise(self):
failed_percentage = round(
(
self.confluence_client.api_failed_count
/ self.confluence_client.api_total_count
)
* 100,
2,
)
if failed_percentage >= API_FAILURE_THRESHOLD * 100:
msg = f"High percentage of API exceptions {failed_percentage}% is greater than or equal to the default threshold {API_FAILURE_THRESHOLD * 100}%. This calculation is done at the end of the synchronization process. Please review the logs for more details."
self._logger.error(msg)
raise SyncFailure(msg)

async def get_docs(self, filtering=None):
"""Executes the logic to fetch Confluence content in async manner.

Expand Down Expand Up @@ -1274,3 +1374,6 @@ async def get_docs(self, filtering=None):
async for item in self._consumer():
yield item
await self.fetchers.join()

if self.confluence_client.api_total_count > MIN_API_CALL:
self.check_api_exceptions_and_raise()
Loading