Skip to content

Commit 2db4c06

Browse files
authored
Bring recent diffs for GitHub connector into develop branch (#3780)
PR to bring the recent changes from #3708 into the `develop` branch. This PR is a prerequisite to splitting apart the GitHub connector
1 parent 1a238fd commit 2db4c06

File tree

2 files changed

+336
-100
lines changed

2 files changed

+336
-100
lines changed

app/connectors_service/connectors/sources/github.py

Lines changed: 209 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import json
99
import time
10+
from collections import defaultdict
1011
from enum import Enum
1112
from functools import cached_property, partial
1213

@@ -19,9 +20,7 @@
1920
)
2021
from connectors_sdk.logger import logger
2122
from connectors_sdk.source import BaseDataSource, ConfigurableFieldValueError
22-
from connectors_sdk.utils import (
23-
nested_get_from_dict,
24-
)
23+
from connectors_sdk.utils import nested_get_from_dict
2524
from gidgethub import QueryError, sansio
2625
from gidgethub.abc import (
2726
BadGraphQLRequest,
@@ -199,6 +198,11 @@ class GithubQuery(Enum):
199198
}
200199
}
201200
"""
201+
BATCH_REPO_QUERY_TEMPLATE = """
202+
query ({batch_queries}) {{
203+
{query_body}
204+
}}
205+
"""
202206
PULL_REQUEST_QUERY = f"""
203207
query ($owner: String!, $name: String!, $cursor: String) {{
204208
repository(owner: $owner, name: $name) {{
@@ -770,12 +774,18 @@ def _get_client(self):
770774
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
771775
skipped_exceptions=UnauthorizedException,
772776
)
773-
async def graphql(self, query, variables=None):
777+
async def graphql(
778+
self,
779+
query: str,
780+
variables: dict | None = None,
781+
ignore_errors: list[str] | None = None,
782+
) -> dict:
774783
"""Invoke GraphQL request to fetch repositories, pull requests, and issues.
775784
776785
Args:
777786
query: Dictionary comprising of query for the GraphQL request.
778787
variables: Dictionary comprising of query for the GraphQL request.
788+
ignore_errors: List of error types to ignore and return null for instead of raising exceptions.
779789
780790
Raises:
781791
UnauthorizedException: Unauthorized exception
@@ -784,6 +794,9 @@ async def graphql(self, query, variables=None):
784794
Yields:
785795
dictionary: Client response
786796
"""
797+
798+
ignore_errors = ignore_errors or []
799+
787800
url = f"{self.base_url}/graphql"
788801
self._logger.debug(
789802
f"Sending POST to {url} with query: '{json.dumps(query)}' and variables: '{json.dumps(variables)}'"
@@ -809,13 +822,24 @@ async def graphql(self, query, variables=None):
809822
else:
810823
raise
811824
except QueryError as exception:
812-
for error in exception.response.get("errors"):
825+
errors = exception.response.get("errors", [])
826+
for error in errors:
813827
if (
814-
error.get("type").lower() == "rate_limited"
828+
error.get("type", "").lower() == "rate_limited"
815829
and "api rate limit exceeded" in error.get("message").lower()
816830
):
817831
await self._put_to_sleep(resource_type="graphql")
818-
msg = f"Error while executing query. Exception: {exception.response.get('errors')}"
832+
833+
if all(error.get("type") in ignore_errors for error in errors):
834+
# All errors are ignored, return just the data part without errors
835+
return exception.response.get("data", {})
836+
837+
# report only non-ignored errors
838+
non_ignored_errors = [
839+
error for error in errors if error.get("type") not in ignore_errors
840+
]
841+
842+
msg = f"Error while executing query. Exception: {non_ignored_errors}"
819843
raise Exception(msg) from exception
820844
except Exception as e:
821845
self._logger.debug(
@@ -1004,6 +1028,93 @@ async def get_foreign_repo(self, repo_name):
10041028
)
10051029
return data.get(REPOSITORY_OBJECT)
10061030

1031+
async def get_repos_by_fully_qualified_name_batch(
1032+
self, repo_names: list[str], batch_size: int = 15
1033+
) -> dict[str, dict | None]:
1034+
"""Batch validate multiple repositories by fully qualified name in fewer GraphQL requests.
1035+
1036+
Args:
1037+
repo_names (list): List of fully qualified repository names (owner/repo format) to validate
1038+
batch_size (int): Number of repos to validate per request (default 15)
1039+
1040+
Returns:
1041+
dict: Dictionary mapping repo_name to repository object (or None if invalid)
1042+
"""
1043+
results = {}
1044+
1045+
# Process repositories in batches to avoid hitting GraphQL complexity limits
1046+
for i in range(0, len(repo_names), batch_size):
1047+
batch = repo_names[i : i + batch_size]
1048+
batch_results = await self._fetch_repos_batch(batch)
1049+
results.update(batch_results)
1050+
1051+
return results
1052+
1053+
async def _fetch_repos_batch(self, repo_names: list[str]) -> dict[str, dict | None]:
1054+
"""Fetch a batch of repositories in a single GraphQL query using aliases.
1055+
1056+
Note: This method will NOT raise exceptions for non-existent repositories.
1057+
Invalid/non-existent repositories will be returned as None values in the result.
1058+
"""
1059+
if not repo_names:
1060+
return {}
1061+
1062+
query_parts = []
1063+
variables = {}
1064+
1065+
for i, repo_name in enumerate(repo_names):
1066+
owner, repo = self.get_repo_details(repo_name=repo_name)
1067+
alias = f"repo{i}"
1068+
1069+
variables[f"{alias}_owner"] = owner
1070+
variables[f"{alias}_name"] = repo
1071+
1072+
query_part = f"""
1073+
{alias}: repository(owner: ${alias}_owner, name: ${alias}_name) {{
1074+
id
1075+
updatedAt
1076+
name
1077+
nameWithOwner
1078+
url
1079+
description
1080+
visibility
1081+
primaryLanguage {{
1082+
name
1083+
}}
1084+
defaultBranchRef {{
1085+
name
1086+
}}
1087+
isFork
1088+
stargazerCount
1089+
watchers {{
1090+
totalCount
1091+
}}
1092+
forkCount
1093+
createdAt
1094+
isArchived
1095+
}}"""
1096+
query_parts.append(query_part)
1097+
1098+
variable_declarations = [
1099+
f"${var_name}: String!" for var_name in variables.keys()
1100+
]
1101+
1102+
query = GithubQuery.BATCH_REPO_QUERY_TEMPLATE.value.format(
1103+
batch_queries=", ".join(variable_declarations),
1104+
query_body=" ".join(query_parts),
1105+
)
1106+
1107+
data = await self.graphql(
1108+
query=query, variables=variables, ignore_errors=["NOT_FOUND"]
1109+
)
1110+
1111+
# Map results back to repo names
1112+
results = {
1113+
repo_name: data.get(f"repo{i}") for i, repo_name in enumerate(repo_names)
1114+
}
1115+
1116+
return results
1117+
10071118
async def _fetch_all_members(self, org_name):
10081119
org_variables = {
10091120
"orgName": org_name,
@@ -1318,7 +1429,7 @@ async def get_invalid_repos(self):
13181429
else:
13191430
return await self._get_invalid_repos_for_personal_access_token()
13201431

1321-
async def _get_invalid_repos_for_github_app(self):
1432+
async def _get_invalid_repos_for_github_app(self) -> list[str]:
13221433
# A github app can be installed on multiple orgs/personal accounts,
13231434
# so the repo must be configured in the format of 'OWNER/REPO', any other format will be rejected
13241435
invalid_repos = set(
@@ -1329,18 +1440,62 @@ async def _get_invalid_repos_for_github_app(self):
13291440
self.configured_repos,
13301441
)
13311442
)
1332-
for full_repo_name in self.configured_repos:
1333-
if full_repo_name in invalid_repos:
1334-
continue
1443+
1444+
# Group valid repos by owner for batch validation
1445+
valid_repos = [
1446+
repo for repo in self.configured_repos if repo not in invalid_repos
1447+
]
1448+
repos_by_owner = defaultdict(list)
1449+
1450+
await self._fetch_installations()
1451+
1452+
for full_repo_name in valid_repos:
13351453
owner, repo_name = self.github_client.get_repo_details(
13361454
repo_name=full_repo_name
13371455
)
1338-
if await self._get_repo_object_for_github_app(owner, repo_name) is None:
1456+
1457+
# Check if GitHub App is installed on this owner
1458+
if owner not in self._installations:
1459+
self._logger.debug(
1460+
f"Invalid repo {full_repo_name} as the github app is not installed on {owner}"
1461+
)
13391462
invalid_repos.add(full_repo_name)
1463+
continue
1464+
1465+
repos_by_owner[owner].append(full_repo_name)
1466+
1467+
# Batch validate repos for each owner
1468+
for owner, owner_repos in repos_by_owner.items():
1469+
await self.github_client.update_installation_id(self._installations[owner])
1470+
1471+
batch_results = (
1472+
await self.github_client.get_repos_by_fully_qualified_name_batch(
1473+
owner_repos
1474+
)
1475+
)
1476+
1477+
for repo_name, repo_data in batch_results.items():
1478+
if repo_data:
1479+
if self.configuration["repo_type"] == "organization":
1480+
if owner not in self.org_repos:
1481+
self.org_repos[owner] = {}
1482+
self.org_repos[owner][repo_name] = repo_data
1483+
else:
1484+
if owner not in self.user_repos:
1485+
self.user_repos[owner] = {}
1486+
self.user_repos[owner][repo_name] = repo_data
1487+
else:
1488+
self._logger.debug(f"Detected invalid repository: {repo_name}.")
1489+
invalid_repos.add(repo_name)
13401490

13411491
return list(invalid_repos)
13421492

1343-
async def _get_repo_object_for_github_app(self, owner, repo_name):
1493+
async def _get_repo_object_for_github_app(
1494+
self, owner: str, repo_name: str
1495+
) -> dict | None:
1496+
# Note: this method fetches potentially all user or org repos and caches them,
1497+
# so it should be used sparingly as it could consume a lot of API rate limit
1498+
# due to possibly multiple pages of repos
13441499
await self._fetch_installations()
13451500
full_repo_name = f"{owner}/{repo_name}"
13461501
if owner not in self._installations:
@@ -1373,50 +1528,48 @@ async def _get_repo_object_for_github_app(self, owner, repo_name):
13731528

13741529
return cached_repo[owner][full_repo_name]
13751530

1376-
async def _get_invalid_repos_for_personal_access_token(self):
1531+
async def _get_invalid_repos_for_personal_access_token(self) -> list[str]:
13771532
try:
1533+
all_repos: list[str] = []
1534+
# Combine all repos for unified batch validation
13781535
if self.configuration["repo_type"] == "other":
13791536
logged_in_user = await self._logged_in_user()
13801537
foreign_repos, configured_repos = self.github_client.bifurcate_repos(
13811538
repos=self.configured_repos,
13821539
owner=logged_in_user,
13831540
)
1384-
if logged_in_user not in self.user_repos:
1385-
self.user_repos[logged_in_user] = {}
1386-
async for repo in self.github_client.get_user_repos(logged_in_user):
1387-
self.user_repos[logged_in_user][repo["nameWithOwner"]] = repo
1388-
invalid_repos = list(
1389-
set(configured_repos) - set(self.user_repos[logged_in_user].keys())
1390-
)
1391-
1392-
for repo_name in foreign_repos:
1393-
try:
1394-
self.foreign_repos[
1395-
repo_name
1396-
] = await self.github_client.get_foreign_repo(
1397-
repo_name=repo_name
1398-
)
1399-
except Exception:
1400-
self._logger.debug(f"Detected invalid repository: {repo_name}.")
1401-
invalid_repos.append(repo_name)
1541+
all_repos = configured_repos + foreign_repos
14021542
else:
14031543
foreign_repos, configured_repos = self.github_client.bifurcate_repos(
14041544
repos=self.configured_repos,
14051545
owner=self.configuration["org_name"],
14061546
)
1407-
configured_repos.extend(foreign_repos)
1408-
if self.configuration["org_name"] not in self.org_repos:
1409-
self.org_repos[self.configuration["org_name"]] = {}
1410-
async for repo in self.github_client.get_org_repos(
1411-
self.configuration["org_name"]
1412-
):
1413-
self.org_repos[self.configuration["org_name"]][
1414-
repo["nameWithOwner"]
1415-
] = repo
1416-
invalid_repos = list(
1417-
set(configured_repos)
1418-
- set(self.org_repos[self.configuration["org_name"]].keys())
1547+
all_repos = configured_repos + foreign_repos
1548+
1549+
invalid_repos = []
1550+
batch_results = (
1551+
await self.github_client.get_repos_by_fully_qualified_name_batch(
1552+
all_repos
14191553
)
1554+
)
1555+
for repo_name, repo_data in batch_results.items():
1556+
if not repo_data:
1557+
self._logger.debug(f"Detected invalid repository: {repo_name}.")
1558+
invalid_repos.append(repo_name)
1559+
continue
1560+
# Store valid repos for potential later use
1561+
if repo_name in foreign_repos:
1562+
self.foreign_repos[repo_name] = repo_data
1563+
else:
1564+
# Store configured repos in the appropriate cache
1565+
if self.configuration["repo_type"] == "other":
1566+
logged_in_user = await self._logged_in_user()
1567+
self.user_repos.setdefault(logged_in_user, {})[repo_name] = (
1568+
repo_data
1569+
)
1570+
else:
1571+
org_name = self.configuration["org_name"]
1572+
self.org_repos.setdefault(org_name, {})[repo_name] = repo_data
14201573
return invalid_repos
14211574
except Exception as exception:
14221575
self._logger.exception(
@@ -1588,6 +1741,9 @@ async def _fetch_installations(self):
15881741
return self._installations
15891742

15901743
async def _get_personal_repos(self, user):
1744+
# Note: this method fetches potentially all user repos and caches them,
1745+
# so it should be used sparingly as it could consume a lot of API rate limit
1746+
# due to possibly multiple pages of repos
15911747
self._logger.info(f"Fetching personal repos {user}")
15921748
if user in self.user_repos:
15931749
for repo_object in self.user_repos[user]:
@@ -1599,6 +1755,9 @@ async def _get_personal_repos(self, user):
15991755
yield repo_object
16001756

16011757
async def _get_org_repos(self, org_name):
1758+
# Note: this method fetches potentially all org repos and caches them,
1759+
# so it should be used sparingly as it could consume a lot of API rate limit
1760+
# due to possibly multiple pages of repos
16021761
self._logger.info(f"Fetching org repos for {org_name}")
16031762
if org_name in self.org_repos:
16041763
for repo_object in self.org_repos[org_name]:
@@ -1610,6 +1769,8 @@ async def _get_org_repos(self, org_name):
16101769
yield repo_object
16111770

16121771
async def _get_configured_repos(self, configured_repos):
1772+
# Note: this method calls _get_repo_object_for_github_app - see comments there
1773+
# for potential performance implications
16131774
self._logger.info(f"Fetching configured repos: '{configured_repos}'")
16141775
for repo_name in configured_repos:
16151776
self._logger.info(f"Fetching repo: '{repo_name}'")
@@ -1649,6 +1810,10 @@ async def _get_configured_repos(self, configured_repos):
16491810
yield self._convert_repo_object_to_doc(repo_object)
16501811

16511812
async def _fetch_repos(self):
1813+
# Note: this method calls _get_configured_repos
1814+
# which in turn calls _get_repo_object_for_github_app
1815+
# or _get_personal_repos/_get_org_repos, see comments there
1816+
# for potential performance/rate limit implications
16521817
self._logger.info("Fetching repos")
16531818
try:
16541819
if (

0 commit comments

Comments
 (0)