Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pyyaml
httpx
trio
python-dateutil
pycrowdsec
pycrowdsec
tenacity
4 changes: 3 additions & 1 deletion src/fastly_bouncer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import yaml

from fastly_bouncer.fastly_api import FastlyAPI
from fastly_bouncer.utils import are_filled_validator, VERSION
from fastly_bouncer.utils import are_filled_validator, VERSION, SUPPORTED_ACTIONS


@dataclass
Expand All @@ -29,6 +29,7 @@ class FastlyServiceConfig:
activate: bool = False
max_items: int = 20000
captcha_cookie_expiry_duration: str = "1800"
supported_actions: List = field(default_factory=list)

def __post_init__(self):
are_filled_validator(**{key: getattr(self, key) for key in asdict(self).keys()})
Expand Down Expand Up @@ -171,6 +172,7 @@ async def generate_config_for_service(api: FastlyAPI, service_id: str, sender_ch
activate=False,
clone_reference_version=True,
reference_version=ref_version,
supported_actions=SUPPORTED_ACTIONS,
)
)

Expand Down
29 changes: 15 additions & 14 deletions src/fastly_bouncer/fastly_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
logger: logging.Logger = logging.getLogger("")


ACL_CAPACITY = 100

ACL_CAPACITY = 1000 # as of 2024-05
ACL_BATCH_SIZE = 1000 # as of 2024-05

@dataclass
class ACL:
Expand Down Expand Up @@ -47,6 +47,8 @@ def as_jsonable_dict(self) -> Dict:
"created": self.created,
}

def as_set(self) -> Set[str]:
return set([ip_subnet for ip_subnet, _ in self.entries.items()])

@dataclass
class VCL:
Expand Down Expand Up @@ -78,7 +80,9 @@ def to_dict(self):


async def raise_on_4xx_5xx(response):
response.raise_for_status()
if response.is_error:
await response.aread()
response.raise_for_status()


class FastlyAPI:
Expand All @@ -97,7 +101,7 @@ def __init__(self, token):
async def get_version_to_clone(self, service_id: str) -> str:
"""
Gets the version to clone from. If service has active version, then the active version will be cloned.
Else the the version which was last updated would be cloned
Else the version which was last updated would be cloned
"""

service_versions_resp = await self.session.get(
Expand Down Expand Up @@ -272,19 +276,18 @@ async def update_dynamic_vcl(self, vcl: VCL):
resp.json()
return vcl

async def refresh_acl_entries(self, acl: ACL) -> Dict[str, str]:
async def refresh_acl_entries(self, acl: ACL) -> None:
resp = await self.session.get(
self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries?per_page=100")
self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries?per_page={ACL_BATCH_SIZE}")
)
resp = resp.json()
acl.entries = {}
for entry in resp:
acl.entries[f"{entry['ip']}/{entry['subnet']}"] = entry["id"]
return acl

async def process_acl(self, acl: ACL):
logger.debug(with_suffix(f"entries to delete {acl.entries_to_delete}", acl_id=acl.id))
logger.debug(with_suffix(f"entries to add {acl.entries_to_add}", acl_id=acl.id))
logger.debug(with_suffix(f"entries to delete %s", acl_id=acl.id), acl.entries_to_delete)
logger.debug(with_suffix(f"entries to add %s", acl_id=acl.id), acl.entries_to_add)
update_entries = []
for entry_to_add in acl.entries_to_add:
if entry_to_add in acl.entries:
Expand All @@ -304,19 +307,17 @@ async def process_acl(self, acl: ACL):
if not update_entries:
return

# Only 100 operations per request can be done on an acl.
# Only ACL_BATCH_SIZE operations per request can be done on an acl.
async with trio.open_nursery() as n:
for i in range(0, len(update_entries), 100):
update_entries_batch = update_entries[i : i + 100]
for i in range(0, len(update_entries), ACL_BATCH_SIZE):
update_entries_batch = update_entries[i : i + ACL_BATCH_SIZE]
request_body = {"entries": update_entries_batch}
f = partial(self.session.patch, json=request_body)
n.start_soon(
f,
self.api_url(f"/service/{acl.service_id}/acl/{acl.id}/entries"),
)

acl = await self.refresh_acl_entries(acl)

@staticmethod
def api_url(endpoint: str) -> str:
return urljoin(FastlyAPI.base_url, endpoint)
Expand Down
36 changes: 31 additions & 5 deletions src/fastly_bouncer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
logger: logging.Logger = get_default_logger()

exiting = False
reload_acls = "start up"


def sigterm_signal_handler(signum, frame):
Expand All @@ -44,6 +45,14 @@ def sigterm_signal_handler(signum, frame):
signal.signal(signal.SIGINT, sigterm_signal_handler)


def sighup_signal_handler(signum, frame):
global reload_acls
reload_acls = "signal"


signal.signal(signal.SIGHUP, sighup_signal_handler)


async def setup_action_for_service(
fastly_api: FastlyAPI,
action: str,
Expand Down Expand Up @@ -135,7 +144,17 @@ async def setup_service(
)

acl_collection_by_action = {}
for action in SUPPORTED_ACTIONS:

if service_cfg.supported_actions:
if any(map(lambda act: act not in SUPPORTED_ACTIONS, service_cfg.supported_actions)):
logger.error(f"Ignoring supported_actions of {service_cfg.supported_actions}: includes invalid actions")
supported_actions = SUPPORTED_ACTIONS
else:
supported_actions = service_cfg.supported_actions
else:
supported_actions = SUPPORTED_ACTIONS

for action in supported_actions:
sender, receiver = trio.open_memory_channel(0)
async with trio.open_nursery() as n:
async with sender:
Expand All @@ -162,6 +181,7 @@ async def setup_service(
version=version,
activate=service_cfg.activate,
captcha_expiry_duration=service_cfg.captcha_cookie_expiry_duration,
supported_actions=service_cfg.supported_actions,
)
await s.create_static_vcls()
await sender_chan.send(s)
Expand Down Expand Up @@ -195,7 +215,7 @@ async def setup_fastly_infra(config: Config, cleanup_mode):
else:
cache = json.loads(s)
services = list(map(Service.from_jsonable_dict, cache["service_states"]))
logger.info(f"loaded exisitng infra using cache")
logger.info(f"loaded existing infra using cache")
if not cleanup_mode:
return services
else:
Expand Down Expand Up @@ -236,7 +256,7 @@ def set_logger(config: Config):


async def run(config: Config, services: List[Service]):
global VERSION
global VERSION, reload_acls
crowdsec_client = StreamClient(
lapi_url=config.crowdsec_config.lapi_url,
api_key=config.crowdsec_config.lapi_key,
Expand All @@ -251,17 +271,23 @@ async def run(config: Config, services: List[Service]):
while True and not exiting:
new_state = crowdsec_client.get_current_decisions()

if reload_acls:
logger.info(f"Reload of ACLS triggered by {reload_acls}")
reload_acls = False
for s in services:
await s.reload_acls()

async with trio.open_nursery() as n:
for s in services:
n.start_soon(s.transform_state, new_state)

new_states = list(map(lambda service: service.as_jsonable_dict(), services))
if new_states != previous_states:
logger.debug("updating cache")
logger.debug("writing updated cache of fastly state")
new_cache = {"service_states": new_states, "bouncer_version": VERSION}
async with await trio.open_file(config.cache_path, "w") as f:
await f.write(json.dumps(new_cache, indent=4))
logger.debug("done updating cache")
logger.debug("done writing updated cache of fastly state")
previous_states = new_states

if exiting:
Expand Down
Loading