Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a46cd16
[Confluence] Improve error handling in Confluence pagination
parth-elastic Jun 3, 2024
d9bc3d6
Introduce threshold constants for error triggering
parth-elastic Jun 5, 2024
55074f8
Addressed comments
parth-elastic Jun 10, 2024
8d16015
Modify Logger and Fix pipeline
parth-elastic Jun 10, 2024
615f5d9
MIN_API_CALL for error threshold handling
parth-elastic Jun 13, 2024
2841c50
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Jun 18, 2024
77957b4
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Jul 15, 2024
32bb417
expanded error handling
parth-elastic Jul 15, 2024
475157d
resolved conflict
parth-elastic Jul 18, 2024
aa48872
revert changes
parth-elastic Jul 18, 2024
4bdd8a3
revert change
parth-elastic Jul 18, 2024
2298450
resolve conflicts
parth-elastic Aug 8, 2024
e896eef
resloved conflicts
parth-elastic Aug 8, 2024
f2e2e36
revert changes
parth-elastic Aug 8, 2024
1dda55f
revert deleted files
parth-elastic Aug 8, 2024
c41b86e
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Aug 13, 2024
1e63d05
increase queue size
parth-elastic Aug 13, 2024
b2eb195
Delete lib64
artem-shelkovnikov Sep 9, 2024
f4f0ee5
Merge branch 'main' of github.com:elastic/connectors into confluence-…
parth-elastic Sep 18, 2024
d57fee3
Remove redundant Exception
parth-elastic Sep 18, 2024
dababfd
Merge branch 'confluence-pagination-swallow-error' of github.com:elas…
parth-elastic Sep 18, 2024
9c837c1
fix lint
parth-elastic Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 141 additions & 38 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from urllib.parse import urljoin

import aiohttp
from aiohttp.client_exceptions import ClientResponseError, ServerDisconnectedError
from aiohttp.client_exceptions import (
ClientPayloadError,
ClientResponseError,
ServerDisconnectedError,
)

from connectors.access_control import ACCESS_CONTROL
from connectors.logger import logger
Expand Down Expand Up @@ -102,6 +106,10 @@ class NotFound(Exception):
pass


class SyncFailure(Exception):
pass


class ConfluenceClient:
"""Confluence client to handle API calls made to Confluence"""

Expand All @@ -115,6 +123,8 @@ def __init__(self, configuration):
self.certificate = self.configuration["ssl_ca"]
self.retry_count = self.configuration["retry_count"]
self.index_labels = self.configuration["index_labels"]
self.api_total_count = 0
self.api_failed_count = 0
if self.data_source_type == CONFLUENCE_CLOUD:
self.host_url = os.path.join(self.host_url, "wiki")

Expand Down Expand Up @@ -221,17 +231,27 @@ async def api_call(self, url):
response: Client response
"""
self._logger.debug(f"Making a GET call for url: {url}")
try:
async with self._get_session().get(
url=url,
ssl=self.ssl_ctx,
) as response:
yield response
except ServerDisconnectedError:
await self.close_session()
raise
except ClientResponseError as exception:
await self._handle_client_errors(url=url, exception=exception)
if url != os.path.join(self.host_url, PING_URL):
self.api_total_count += 1
while True:
try:
async with self._get_session().get(
url=url,
ssl=self.ssl_ctx,
) as response:
yield response
break
except ServerDisconnectedError:
await self.close_session()
raise
except ClientResponseError as exception:
await self._handle_client_errors(url=url, exception=exception)
except ClientPayloadError:
self.api_failed_count += 1
raise
except Exception:
self.api_failed_count += 1
raise

async def paginated_api_call(self, url_name, **url_kwargs):
"""Make a paginated API call for Confluence objects using the passed url_name.
Expand All @@ -258,9 +278,18 @@ async def paginated_api_call(self, url_name, **url_kwargs):
self.host_url,
links.get("next")[1:],
)
except ServerDisconnectedError:
raise
except ClientResponseError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url_name} from {url}. Exception: {exception}."
)
break
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
self._logger.exception(
f"Something went wrong while fetching {url_name} from {url}. Exception: {exception}."
)
break

Expand Down Expand Up @@ -289,9 +318,18 @@ async def paginated_api_call_for_datacenter_syncrule(self, url_name, **url_kwarg
url_kwargs["start"] = start
if len(json_response.get("results", [])) < LIMIT:
break
except ServerDisconnectedError:
raise
except ClientResponseError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url_name} from {url}. Exception: {exception}."
)
break
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
self._logger.exception(
f"Something went wrong while fetching {url_name} from {url}. Exception: {exception}."
)
break

Expand Down Expand Up @@ -333,6 +371,16 @@ async def fetch_server_space_permission(self, url):
f"Something went wrong. Make sure you have installed Extender for running confluence datacenter/server DLS. Exception: {exception}."
)
return {}
except ServerDisconnectedError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url}. Exception: {exception}."
)
except Exception as exception:
self._logger.exception(
f"Something went wrong while fetching from {url}. Exception: {exception}."
)

async def fetch_page_blog_documents(self, api_query):
async for response in self.paginated_api_call(
Expand Down Expand Up @@ -380,20 +428,49 @@ async def fetch_confluence_server_users(self):
url = urljoin(self.host_url, URLS[USERS_FOR_SERVER])

while True:
url_ = url.format(start=start_at, limit=limit)
async for users in self.api_call(url=url_):
response = await users.json()
if len(response.get(key)) == 0:
return
yield response.get(key)
start_at += limit
try:
url_ = url.format(start=start_at, limit=limit)
async for users in self.api_call(url=url_):
response = await users.json()
if len(response.get(key)) == 0:
return
yield response.get(key)
start_at += limit
except ServerDisconnectedError:
raise
except ClientResponseError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url}. Exception: {exception}."
)
break
except Exception as exception:
self._logger.exception(
f"Something went wrong while fetching from {url}. Exception: {exception}."
)
break

async def fetch_label(self, label_id):
async for label_data in self.api_call(
url=os.path.join(self.host_url, URLS[LABEL].format(id=label_id))
):
labels = await label_data.json()
return [label.get("name") for label in labels["results"]]
url = os.path.join(self.host_url, URLS[LABEL].format(id=label_id))
try:
async for label_data in self.api_call(
url=url,
):
labels = await label_data.json()
return [label.get("name") for label in labels["results"]]
except ServerDisconnectedError:
raise
except ClientResponseError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url}. Exception: {exception}."
)
except Exception as exception:
self._logger.exception(
f"Something went wrong while fetching from {url}. Exception: {exception}."
)


class ConfluenceDataSource(BaseDataSource):
Expand Down Expand Up @@ -1010,18 +1087,31 @@ async def download_attachment(self, url, attachment, timestamp=None, doit=False)

self._logger.info(f"Downloading content for file: {filename}")
document = {"_id": attachment["_id"], "_timestamp": attachment["_timestamp"]}
return await self.download_and_extract_file(
document,
filename,
file_extension,
partial(
self.generic_chunked_download_func,
try:
return await self.download_and_extract_file(
document,
filename,
file_extension,
partial(
self.confluence_client.api_call,
url=os.path.join(self.confluence_client.host_url, url),
self.generic_chunked_download_func,
partial(
self.confluence_client.api_call,
url=os.path.join(self.confluence_client.host_url, url),
),
),
),
)
)
except ServerDisconnectedError:
raise
except ClientResponseError:
raise
except ClientPayloadError as exception:
self._logger.error(
f"Payload error occurred while fetching {url}. Exception: {exception}."
)
except Exception as exception:
self._logger.exception(
f"Something went wrong while fetching from {url}. Exception: {exception}."
)

async def _attachment_coro(self, document, access_control):
"""Coroutine to add attachments to Queue and download content
Expand Down Expand Up @@ -1178,6 +1268,16 @@ async def _consumer(self):
else:
yield item

def check_api_exceptions_and_raise(self):
failed_percentage = (
self.confluence_client.api_failed_count
/ self.confluence_client.api_total_count
) * 100
if failed_percentage >= 30:
msg = f"High percentage of API exceptions: {failed_percentage}%. Please review the logs for more information."
self._logger.error(msg)
raise SyncFailure(msg)

async def get_docs(self, filtering=None):
"""Executes the logic to fetch Confluence content in async manner.

Expand Down Expand Up @@ -1231,3 +1331,6 @@ async def get_docs(self, filtering=None):
async for item in self._consumer():
yield item
await self.fetchers.join()

if self.confluence_client.api_total_count > 0:
self.check_api_exceptions_and_raise()
Loading