diff --git a/requirements.txt b/requirements.txt index 3fd6b93..17e36b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ pyyaml httpx trio python-dateutil -pycrowdsec \ No newline at end of file +pycrowdsec +tenacity \ No newline at end of file diff --git a/src/fastly_bouncer/config.py b/src/fastly_bouncer/config.py index c74226b..9d41fb4 100644 --- a/src/fastly_bouncer/config.py +++ b/src/fastly_bouncer/config.py @@ -7,7 +7,7 @@ import yaml from fastly_bouncer.fastly_api import FastlyAPI -from fastly_bouncer.utils import are_filled_validator, VERSION +from fastly_bouncer.utils import are_filled_validator, VERSION, SUPPORTED_ACTIONS @dataclass @@ -29,6 +29,7 @@ class FastlyServiceConfig: activate: bool = False max_items: int = 20000 captcha_cookie_expiry_duration: str = "1800" + supported_actions: List = field(default_factory=list) def __post_init__(self): are_filled_validator(**{key: getattr(self, key) for key in asdict(self).keys()}) @@ -171,6 +172,7 @@ async def generate_config_for_service(api: FastlyAPI, service_id: str, sender_ch activate=False, clone_reference_version=True, reference_version=ref_version, + supported_actions=SUPPORTED_ACTIONS, ) ) diff --git a/src/fastly_bouncer/fastly_api.py b/src/fastly_bouncer/fastly_api.py index fbadf1b..2c1996f 100644 --- a/src/fastly_bouncer/fastly_api.py +++ b/src/fastly_bouncer/fastly_api.py @@ -15,8 +15,8 @@ logger: logging.Logger = logging.getLogger("") -ACL_CAPACITY = 100 - +ACL_CAPACITY = 1000 # as of 2024-05 +ACL_BATCH_SIZE = 1000 # as of 2024-05 @dataclass class ACL: @@ -47,6 +47,8 @@ def as_jsonable_dict(self) -> Dict: "created": self.created, } + def as_set(self) -> Set[str]: + return set([ip_subnet for ip_subnet, _ in self.entries.items()]) @dataclass class VCL: @@ -78,7 +80,9 @@ def to_dict(self): async def raise_on_4xx_5xx(response): - response.raise_for_status() + if response.is_error: + await response.aread() + response.raise_for_status() class FastlyAPI: @@ -97,7 +101,7 @@ def __init__(self, token): async def get_version_to_clone(self, service_id: str) -> str: """ Gets the version to clone from. If service has active version, then the active version will be cloned. - Else the the version which was last updated would be cloned + Else the version which was last updated would be cloned """ service_versions_resp = await self.session.get( @@ -272,19 +276,18 @@ async def update_dynamic_vcl(self, vcl: VCL): resp.json() return vcl - async def refresh_acl_entries(self, acl: ACL) -> Dict[str, str]: + async def refresh_acl_entries(self, acl: ACL) -> None: resp = await self.session.get( - self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries?per_page=100") + self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries?per_page={ACL_BATCH_SIZE}") ) resp = resp.json() acl.entries = {} for entry in resp: acl.entries[f"{entry['ip']}/{entry['subnet']}"] = entry["id"] - return acl async def process_acl(self, acl: ACL): - logger.debug(with_suffix(f"entries to delete {acl.entries_to_delete}", acl_id=acl.id)) - logger.debug(with_suffix(f"entries to add {acl.entries_to_add}", acl_id=acl.id)) + logger.debug(with_suffix(f"entries to delete %s", acl_id=acl.id), acl.entries_to_delete) + logger.debug(with_suffix(f"entries to add %s", acl_id=acl.id), acl.entries_to_add) update_entries = [] for entry_to_add in acl.entries_to_add: if entry_to_add in acl.entries: @@ -304,10 +307,10 @@ async def process_acl(self, acl: ACL): if not update_entries: return - # Only 100 operations per request can be done on an acl. + # Only ACL_BATCH_SIZE operations per request can be done on an acl. async with trio.open_nursery() as n: - for i in range(0, len(update_entries), 100): - update_entries_batch = update_entries[i : i + 100] + for i in range(0, len(update_entries), ACL_BATCH_SIZE): + update_entries_batch = update_entries[i : i + ACL_BATCH_SIZE] request_body = {"entries": update_entries_batch} f = partial(self.session.patch, json=request_body) n.start_soon( @@ -315,8 +318,6 @@ async def process_acl(self, acl: ACL): self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries"), ) - acl = await self.refresh_acl_entries(acl) - @staticmethod def api_url(endpoint: str) -> str: return urljoin(FastlyAPI.base_url, endpoint) diff --git a/src/fastly_bouncer/main.py b/src/fastly_bouncer/main.py index 3408195..022c6d4 100644 --- a/src/fastly_bouncer/main.py +++ b/src/fastly_bouncer/main.py @@ -32,6 +32,7 @@ logger: logging.Logger = get_default_logger() exiting = False +reload_acls = "start up" def sigterm_signal_handler(signum, frame): @@ -44,6 +45,14 @@ def sigterm_signal_handler(signum, frame): signal.signal(signal.SIGINT, sigterm_signal_handler) +def sighup_signal_handler(signum, frame): + global reload_acls + reload_acls = "signal" + + +signal.signal(signal.SIGHUP, sighup_signal_handler) + + async def setup_action_for_service( fastly_api: FastlyAPI, action: str, @@ -135,7 +144,17 @@ async def setup_service( ) acl_collection_by_action = {} - for action in SUPPORTED_ACTIONS: + + if service_cfg.supported_actions: + if any(map(lambda act: act not in SUPPORTED_ACTIONS, service_cfg.supported_actions)): + logger.error(f"Ignoring supported_actions of {service_cfg.supported_actions}: includes invalid actions") + supported_actions = SUPPORTED_ACTIONS + else: + supported_actions = service_cfg.supported_actions + else: + supported_actions = SUPPORTED_ACTIONS + + for action in supported_actions: sender, receiver = trio.open_memory_channel(0) async with trio.open_nursery() as n: async with sender: @@ -162,6 +181,7 @@ async def setup_service( version=version, activate=service_cfg.activate, captcha_expiry_duration=service_cfg.captcha_cookie_expiry_duration, + supported_actions=service_cfg.supported_actions, ) await s.create_static_vcls() await sender_chan.send(s) @@ -195,7 +215,7 @@ async def setup_fastly_infra(config: Config, cleanup_mode): else: cache = json.loads(s) services = list(map(Service.from_jsonable_dict, cache["service_states"])) - logger.info(f"loaded exisitng infra using cache") + logger.info(f"loaded existing infra using cache") if not cleanup_mode: return services else: @@ -236,7 +256,7 @@ def set_logger(config: Config): async def run(config: Config, services: List[Service]): - global VERSION + global VERSION, reload_acls crowdsec_client = StreamClient( lapi_url=config.crowdsec_config.lapi_url, api_key=config.crowdsec_config.lapi_key, @@ -251,17 +271,23 @@ async def run(config: Config, services: List[Service]): while True and not exiting: new_state = crowdsec_client.get_current_decisions() + if reload_acls: + logger.info(f"Reload of ACLS triggered by {reload_acls}") + reload_acls = False + for s in services: + await s.reload_acls() + async with trio.open_nursery() as n: for s in services: n.start_soon(s.transform_state, new_state) new_states = list(map(lambda service: service.as_jsonable_dict(), services)) if new_states != previous_states: - logger.debug("updating cache") + logger.debug("writing updated cache of fastly state") new_cache = {"service_states": new_states, "bouncer_version": VERSION} async with await trio.open_file(config.cache_path, "w") as f: await f.write(json.dumps(new_cache, indent=4)) - logger.debug("done updating cache") + logger.debug("done writing updated cache of fastly state") previous_states = new_states if exiting: diff --git a/src/fastly_bouncer/service.py b/src/fastly_bouncer/service.py index 4bb422c..bbfe39b 100644 --- a/src/fastly_bouncer/service.py +++ b/src/fastly_bouncer/service.py @@ -4,10 +4,12 @@ from typing import Dict, Iterable, List, Set import trio +from httpx import HTTPError +from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_result from fastly_bouncer import vcl_templates from fastly_bouncer.fastly_api import ACL, VCL, FastlyAPI -from fastly_bouncer.utils import with_suffix +from fastly_bouncer.utils import with_suffix, transient_http_error logger: logging.Logger = logging.getLogger("") @@ -29,10 +31,10 @@ def __init__( ): self.acls: List[ACL] = acls self.api: FastlyAPI = api - self.service_id = service_id - self.version = version - self.action = action - self.state: Set = state + self.service_id = service_id # Fastly service ID + self.version = version # Fastly version ID, probably of first time it was added and doesn't change + self.action = action # "ban" "captcha" etc + self.state: Set = state # All the "ip/subnet" for the ACLCollection def as_jsonable_dict(self) -> Dict: return { @@ -70,33 +72,32 @@ async def create_acls(self, acl_count: int) -> None: acls.append(acl) return acls - def insert_item(self, item: str) -> bool: + def stage_insert_item(self, item: str) -> bool: """ - Returns True if the item was successfully allocated in an ACL + Returns True if the item was staged in an ACL, False if all the ACLs are full. """ # Check if item is already present in some ACL for acl in self.acls: if not acl.is_full(): acl.entries_to_add.add(item) acl.entry_count += 1 - self.state.add(item) return True return False - def remove_item(self, item: str) -> bool: + def stage_remove_item(self, item: str) -> bool: """ - Returns True if item is found, and removed. + Returns True if item is found, and staged for removal. """ for acl in self.acls: if item not in acl.entries: continue acl.entries_to_delete.add(item) - self.state.discard(item) acl.entry_count -= 1 return True return False def transform_to_state(self, new_state): + """Stages change alter `self` to be in `new_state` but does not send to fastly or change `self.state`.""" new_items = new_state - self.state expired_items = self.state - new_state if new_items: @@ -121,8 +122,8 @@ def transform_to_state(self, new_state): if any([new_item in acl.entries for acl in self.acls]): continue - if not self.insert_item(new_item): - logger.warn( + if not self.stage_insert_item(new_item): + logger.error( with_suffix( f"acl_collection for {self.action} is full. Ignoring remaining items.", service_id=self.service_id, @@ -131,17 +132,22 @@ def transform_to_state(self, new_state): break for expired_item in expired_items: - self.remove_item(expired_item) + if not self.stage_remove_item(expired_item): + logger.debug(with_suffix(f"{expired_item} not found in acl_collection. Ignoring.", + service_id=self.service_id)) + + # at this point changes are staged in the ACLs but not yet sent to Fastly or changed in `ACLCollection.state` async def commit(self) -> None: - acls_to_change = list( - filter(lambda acl: acl.entries_to_add or acl.entries_to_delete, self.acls) - ) + """Make changes go live. + Send the changes stored in `self.acls.entries_to_add` and `self.acls.entries_to_delete` to Fastly and + update `self.state`.""" + acls_to_change = list(filter(lambda acl: acl.entries_to_add or acl.entries_to_delete, self.acls)) if len(acls_to_change): async with trio.open_nursery() as n: for acl in acls_to_change: - n.start_soon(self.update_acl, acl) + n.start_soon(self.commit_acl, acl) logger.info( with_suffix( f"acl collection for {self.action} updated", @@ -149,6 +155,11 @@ async def commit(self) -> None: ) ) + try: + await self.refresh_from_fastly(acls_to_change) + except HTTPError as exc: + logger.warning(f"Could not refresh ACL {acl.name} with {exc.request.url} - {exc}") + def generate_conditions(self) -> str: conditions = [] for acl in self.acls: @@ -156,25 +167,108 @@ def generate_conditions(self) -> str: return " || ".join(conditions) - async def update_acl(self, acl: ACL): + + @retry(wait=wait_exponential(multiplier=1, min=2, max=10), + stop=stop_after_attempt(10), + retry=retry_if_result(transient_http_error)) + async def _commit_acl_diff(self, acl: ACL): logger.debug( with_suffix( - f"commiting changes to acl {acl.name}", + f"commiting changes to acl {acl.name} to fastly", service_id=self.service_id, acl_collection=self.action, ) ) await self.api.process_acl(acl) + + self.state.difference_update(acl.entries_to_delete) + self.state.update(acl.entries_to_add) + acl.entries_to_add = set() + acl.entries_to_delete = set() + logger.debug( + with_suffix( + f"finished commiting changes to acl {acl.name} to fastly, state updated", + service_id=self.service_id, + acl_collection=self.action, + ) + ) + + @retry(wait=wait_exponential(multiplier=1, min=2, max=10), + stop=stop_after_attempt(10), + retry=retry_if_result(transient_http_error)) + async def _refresh_then_commit(self, acl: ACL): logger.debug( with_suffix( - f"commited changes to acl {acl.name}", + f"refreshing acl {acl.name} from fastly", service_id=self.service_id, acl_collection=self.action, ) ) + await self.api.refresh_acl_entries(acl) + current = acl.as_set() + + # if we have an add, and it exists at Fastly, we don't want to add it again at fastly, + # but we still need to put it in self.state. + state_add = acl.entries_to_add.copy() + acl.entries_to_add.difference_update(current) + + # if we have a delete, and it doesn't exist at Fastly, don't want to add it again or Fasty will 400 + # but we still need to remove it from self.state + state_delete = acl.entries_to_delete.copy() + acl.entries_to_delete = current & acl.entries_to_delete + + logger.info(with_suffix(f"{acl.name} local state refreshed, out of sync repaired", + service_id=self.service_id)) + + logger.debug( + with_suffix( + f"commiting changes to acl {acl.name} to fastly", + service_id=self.service_id, + acl_collection=self.action, + ) + ) + await self.api.process_acl(acl) + + self.state.difference_update(state_delete) + self.state.update(state_add) acl.entries_to_add = set() acl.entries_to_delete = set() + logger.debug( + with_suffix( + f"finished refresh of acl {acl.name} and changes sent to fastly", + service_id=self.service_id, + acl_collection=self.action, + ) + ) + + async def commit_acl(self, acl: ACL): + """Sends changes to Fastly and updates `self.state` for this single `acl`.""" + try: + await self._commit_acl_diff(acl) + except* HTTPError as exc: + httpexc = exc.split(HTTPError)[0].exceptions[0] + if httpexc.response.status_code == 400: + logger.info(with_suffix(f"{acl.name} local state out of sync with fastly," + f"msg: '{httpexc.response.text}'", service_id=self.service_id)) + await self._refresh_then_commit(acl) + else: + raise exc + + + async def refresh_from_fastly(self, acls: List[ACL]=[]) -> None: + """Get data for ACLs from fastly and rebuild state.""" + if not acls: + acls_to_refresh = self.acls + else: + acls_to_refresh = acls + + async with trio.open_nursery() as n: + for acl in acls_to_refresh: + n.start_soon(self.api.refresh_acl_entries, acl) + + self.state = set().union(*[acl.as_set() for acl in acls_to_refresh]) + @dataclass class Service: @@ -186,12 +280,12 @@ class Service: activate: bool captcha_expiry_duration: str = "1800" _first_time: bool = True - supported_actions: List = field(default_factory=list) + supported_actions: List = field(default_factory=list) # Ex ["ban", "captcha"] Action types from Crowdsec policies. vcl_by_action: Dict[str, VCL] = field(default_factory=dict) - static_vcls: List[VCL] = field(default_factory=list) - current_conditional_by_action: Dict[str, str] = field(default_factory=dict) - countries_by_action: Dict[str, Set[str]] = field(default_factory=dict) - autonomoussystems_by_action: Dict[str, Set[str]] = field(default_factory=dict) + static_vcls: List[VCL] = field(default_factory=list) # BDC Not in use, related to recaptcha + current_conditional_by_action: Dict[str, str] = field(default_factory=dict) # BDC Not in use + countries_by_action: Dict[str, Set[str]] = field(default_factory=dict) # BDC Not in use + autonomoussystems_by_action: Dict[str, Set[str]] = field(default_factory=dict) # BDC Not in use acl_collection_by_action: Dict[str, ACLCollection] = field(default_factory=dict) @classmethod @@ -472,3 +566,10 @@ def generate_conditional_for_action(self, action): ] ) return f"if ( {condition} )" + + + async def reload_acls(self): + async with trio.open_nursery() as n: + for action, acl_col in self.acl_collection_by_action.items(): + n.start_soon(acl_col.refresh_from_fastly) + logger.info(f"Done reloading ACLS for {self.service_id}") diff --git a/src/fastly_bouncer/utils.py b/src/fastly_bouncer/utils.py index ce29b37..2e54474 100644 --- a/src/fastly_bouncer/utils.py +++ b/src/fastly_bouncer/utils.py @@ -2,6 +2,8 @@ import sys from importlib.metadata import version +from httpx import HTTPError, RequestError, HTTPStatusError + SUPPORTED_ACTIONS = ["ban", "captcha"] VERSION = version("crowdsec-fastly-bouncer") @@ -39,3 +41,14 @@ def get_default_logger(): default_handler.setFormatter(default_formatter) logger.addHandler(default_handler) return logger + + +def transient_http_error(value: Exception) -> bool: + if isinstance(value, RequestError): + return True + if isinstance(value, HTTPStatusError): + if value.request.status_code > 500: + return True + else: + return value.request.status_code == 429 +