Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d19a95e
test: add integration test for PAS setup and attendance
cyrillkuettel Sep 3, 2025
0c9fe4d
refactor: Extract rate set creation into a helper function
cyrillkuettel Sep 3, 2025
d3e490d
refactor: Remove unused legislative period setup from tests
cyrillkuettel Sep 3, 2025
1b8829d
chore: add run.sh script with experient mypy parallel
cyrillkuettel Sep 3, 2025
b5d5b34
update
cyrillkuettel Sep 3, 2025
778c593
update run.sh
cyrillkuettel Sep 4, 2025
5419e51
Work in progress
cyrillkuettel Sep 16, 2025
78547a6
update
cyrillkuettel Sep 17, 2025
f7444c0
update
cyrillkuettel Sep 18, 2025
82aa71e
Pas: Fixes president query.
cyrillkuettel Sep 16, 2025
a284363
refactor: extract attendance filtering logic to collections
cyrillkuettel Sep 18, 2025
7e33ac4
refactor: extract helper functions to reduce test duplication
cyrillkuettel Sep 22, 2025
7880f2a
Retire '/pas', use '/pas-settings'
cyrillkuettel Sep 22, 2025
ff7e6a4
🐛 fix: allow parliamentarians to access attendance records
cyrillkuettel Sep 22, 2025
e566ce6
Update various
cyrillkuettel Sep 22, 2025
8215802
🔒️ feat: enhance parliamentarian permissions and add translations
cyrillkuettel Sep 22, 2025
13d5bee
🩹 fix: add type annotations to resolve mypy issues
cyrillkuettel Sep 22, 2025
810a223
📝 docs: update test comment for clarity
cyrillkuettel Sep 22, 2025
c240728
update
cyrillkuettel Sep 22, 2025
25aa858
add comment [skip ci]
cyrillkuettel Sep 22, 2025
ed9fede
Merge branch 'master' into ogc-2573-berechtigungssystem-parlamentarier
cyrillkuettel Sep 23, 2025
93a359c
fix linting
cyrillkuettel Sep 23, 2025
26621fc
Fixes linting
cyrillkuettel Sep 23, 2025
07e6ff2
fixes some tests
cyrillkuettel Sep 23, 2025
8be0118
try fix for pas-settings view
cyrillkuettel Sep 23, 2025
7854571
🔒️ fix: apply role-based filtering to parliamentarian attendance view
cyrillkuettel Sep 23, 2025
2869c74
update
cyrillkuettel Sep 23, 2025
03bb1ab
update
cyrillkuettel Sep 23, 2025
13c7224
Implement the security fix for the parliamentarian attendance form.
cyrillkuettel Sep 23, 2025
523036a
Fixes test. It now finds 2 search results because we have relationssh…
cyrillkuettel Sep 24, 2025
464175f
Fixes more tests, clean up.
cyrillkuettel Sep 24, 2025
0029ae1
fix linting
cyrillkuettel Sep 24, 2025
fcbf1c4
Revert "try fix for pas-settings view"
cyrillkuettel Sep 24, 2025
592424c
simplify
cyrillkuettel Sep 24, 2025
ea10753
Adds some more shortcuts
cyrillkuettel Sep 24, 2025
f64542d
Use type ignore instead of the other solution
cyrillkuettel Sep 24, 2025
64e4a1c
Add comments
cyrillkuettel Sep 24, 2025
7970a8a
Simplify
cyrillkuettel Sep 24, 2025
77c8bc8
Move permission test files to directory for overview
cyrillkuettel Sep 24, 2025
7de66b7
Fix abstraction leakage
cyrillkuettel Sep 24, 2025
265eb02
fix test
cyrillkuettel Sep 24, 2025
c403243
Merge branch 'master' into ogc-2573-berechtigungssystem-parlamentarier
cyrillkuettel Sep 24, 2025
106a157
Changed TownRequest parameter to PasRequest
cyrillkuettel Sep 24, 2025
ab1649e
Adds proper redirect after login [skip ci]
cyrillkuettel Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/onegov/pas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
log.addHandler(logging.NullHandler())

from onegov.pas.i18n import _

from onegov.pas.app import PasApp

# Import cronjobs to register the decorators
Expand All @@ -15,5 +14,6 @@
__all__ = (
'_',
'log',
'PasApp', 'cronjobs'
'PasApp',
'cronjobs'
)
24 changes: 20 additions & 4 deletions src/onegov/pas/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
from onegov.pas.content import create_new_organisation
from onegov.pas.custom import get_global_tools
from onegov.pas.custom import get_top_navigation
from onegov.pas.request import PasRequest
from onegov.pas.theme import PasTheme
from onegov.town6 import TownApp
from onegov.town6.app import get_i18n_localedirs as get_i18n_localedirs_base
from purl import URL
from onegov.org.models import Organisation


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from onegov.core.types import RenderData
from onegov.org.models import Organisation
from onegov.town6.request import TownRequest
from morepath.authentication import NoIdentity
from morepath.authentication import Identity


class PasApp(TownApp):
request_class = PasRequest

def configure_kub_api(
self,
Expand All @@ -33,21 +38,32 @@ def configure_kub_api(
self.kub_api_token = kub_api_token
self.kub_base_url = kub_base_url

def redirect_after_login(
self,
identity: Identity | NoIdentity,
request: PasRequest, # type:ignore[override]
default: str
) -> str | None:

if default != '/' and '/auth/login' not in str(default):
return None
return URL(request.class_link(Organisation)).path()


@PasApp.setting(section='org', name='create_new_organisation')
def get_create_new_organisation_factory(
) -> Callable[[TownApp, str], Organisation]:
return create_new_organisation


# NOTE: Feriennet doesn't need a citizen login
# NOTE: PAS doesn't need a citizen login
@PasApp.setting(section='org', name='citizen_login_enabled')
def get_citizen_login_enabled() -> bool:
return False


@PasApp.template_variables()
def get_template_variables(request: TownRequest) -> RenderData:
def get_template_variables(request: PasRequest) -> RenderData:
return {
'global_tools': tuple(get_global_tools(request)),
'top_navigation': tuple(get_top_navigation(request)),
Expand Down
41 changes: 35 additions & 6 deletions src/onegov/pas/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import click
import transaction
import logging
import requests
import urllib3
import warnings
from onegov.core.cli import command_group
from onegov.pas.collections.parliamentarian import PASParliamentarianCollection
from onegov.pas.excel_header_constants import (
commission_expected_headers_variant_1,
commission_expected_headers_variant_2,
Expand All @@ -22,10 +24,10 @@
if TYPE_CHECKING:
from collections.abc import Callable
from onegov.pas.app import PasApp
from onegov.pas.app import TownRequest
from onegov.pas.request import PasRequest
from typing import TypeAlias

Processor: TypeAlias = Callable[[TownRequest, PasApp], None]
Processor: TypeAlias = Callable[[PasRequest, PasApp], None]


log = logging.getLogger('onegov.org.cli')
Expand All @@ -45,7 +47,10 @@
def import_commission_data(
excel_file: str,
) -> Processor:
"""Import commission data from an Excel or csv file.
"""
Note: This is deprecated, not really used, as we have the JSON import.

Import commission data from an Excel or csv file.

Assumes that the name of the commission is the filename.

Expand All @@ -57,7 +62,7 @@ def import_commission_data(
"Kommission_Gesundheit_und_Soziales.xlsx"
"""

def import_data(request: TownRequest, app: PasApp) -> None:
def import_data(request: PasRequest, app: PasApp) -> None:

try:
import_commissions_excel(
Expand All @@ -80,6 +85,30 @@ def import_data(request: TownRequest, app: PasApp) -> None:
return import_data


@cli.command(name='update-accounts', context_settings={'singular': True})
@click.option('--dry-run/-no-dry-run', default=False)
def update_accounts_cli(dry_run: bool) -> Processor:
""" Updates user accounts for parliamentarians. """

def do_update_accounts(request: PasRequest, app: PasApp) -> None:

parliamentarians = PASParliamentarianCollection(app)
for parliamentarian in parliamentarians.query():
if not parliamentarian.email_primary:
click.echo(
f'Skipping {parliamentarian.title}, no primary email.'
)
continue
parliamentarians.update_user(
parliamentarian, parliamentarian.email_primary
)

if dry_run:
transaction.abort()

return do_update_accounts


@cli.command('check-api')
@click.option('--url', default='',
help='API endpoint to check')
Expand Down Expand Up @@ -147,7 +176,7 @@ def import_kub_data(
--token "your-token-here" --max-workers 5
"""

def cli_wrapper(request: TownRequest, app: PasApp) -> None:
def cli_wrapper(request: PasRequest, app: PasApp) -> None:
"""CLI wrapper that calls the orchestrator."""
# Create composite output handler for both CLI and database
click_handler = ClickOutputHandler()
Expand Down Expand Up @@ -221,7 +250,7 @@ def update_custom_data(
--max-workers 5
"""

def update_data(request: TownRequest, app: PasApp) -> None:
def update_data(request: PasRequest, app: PasApp) -> None:
# Create composite output handler for both CLI and database
click_handler = ClickOutputHandler()
db_handler = DatabaseOutputHandler()
Expand Down
67 changes: 67 additions & 0 deletions src/onegov/pas/collections/attendence.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
if TYPE_CHECKING:
from datetime import date
from sqlalchemy.orm import Query, Session
from onegov.pas.request import PasRequest


class AttendenceCollection(GenericCollection[Attendence]):
Expand Down Expand Up @@ -138,3 +139,69 @@ def by_party(
commission_id=self.commission_id,
party_id=party_id,
)

def for_parliamentarian(self, parliamentarian_id: str) -> Self:
"""Returns attendances for a specific parliamentarian only."""
return self.for_filter(parliamentarian_id=parliamentarian_id)

def for_commission_president(
self,
parliamentarian_id: str,
active_commission_ids: list[str]
) -> Query[Attendence]:
"""
Returns attendances for a commission president:
- Their own attendances
- Attendances of members in commissions they preside over
"""
query = self.query()
return query.filter(
(Attendence.parliamentarian_id == parliamentarian_id) |
(Attendence.commission_id.in_(active_commission_ids))
)

def view_for_parliamentarian(
self, request: PasRequest
) -> list[Attendence]:
"""
Returns filtered attendances based on user role and permissions.
This encapsulates the filtering logic previously in the view.
"""
user = request.current_user

if not request.is_parliamentarian:
# Admins see all attendances
return self.query().all()

if not (user and hasattr(user, 'parliamentarian') and
user.parliamentarian):
return []

parliamentarian = user.parliamentarian

if user.role == 'commission_president':
from datetime import date
# Commission presidents see own + commission members' attendances
# We have to check all but usually president of just one
active_presidencies = [
cm.commission_id
for cm in parliamentarian.commission_memberships
if (cm.role == 'president' and
(cm.end is None or cm.end >= date.today()))
]

if active_presidencies:
return self.for_commission_president(
str(parliamentarian.id),
active_presidencies
).all()
else:
# Fallback to own attendances only
return self.for_parliamentarian(
str(parliamentarian.id)
).query().all()
else:
# Regular parliamentarians see only their own attendances
return self.for_parliamentarian(
str(parliamentarian.id)
).query().all()
124 changes: 124 additions & 0 deletions src/onegov/pas/collections/parliamentarian.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,137 @@
from __future__ import annotations

import logging
from onegov.core.utils import toggle
from onegov.core.crypto import random_password
from onegov.parliament.collections import ParliamentarianCollection
from onegov.pas.models import PASParliamentarian
from onegov.user import UserCollection

log = logging.getLogger('onegov.pas.collections.parliamentarian')


from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Self
from onegov.core import Framework


class PASParliamentarianCollection(
ParliamentarianCollection[PASParliamentarian]
):

def __init__(self, app: Framework, **kwargs: Any) -> None:
super().__init__(app.session(), **kwargs)
self.app = app

def for_filter(
self,
active: bool | None = None,
party: str | None = None,
) -> Self:
active_ = toggle(self.active, active)
party_ = toggle(self.party, party)

return self.__class__(
self.app,
active=active_,
party=party_
)

@property
def model_class(self) -> type[PASParliamentarian]:
return PASParliamentarian

def add(self, **kwargs: Any) -> PASParliamentarian:
item = super().add(**kwargs)
if not item.email_primary:
log.warning(
f'Creating parliamentarian {item.title} without'
'email_primary. This will prevent user account'
'creation and may cause permission-related failures.'
)
self.update_user(item, item.email_primary)
self.session.flush()
return item

def delete(self, item: PASParliamentarian) -> None:
self.update_user(item, None)
self.session.delete(item)
self.session.flush()

def update_user(
self,
item: PASParliamentarian,
new_email: str | None
) -> None:
""" Keep the parliamentarian and its user account in sync.

* Creates a new user account if an email address is set (if not already
existing).
* Disable user accounts if an email has been deleted.
* Change usernames if an email has changed.
* Make sure used user accounts have the right role.
* Make sure used user accounts are activated.
* Make sure the password is changed if activated or disabled.

"""

old_email = item.email_primary
users = UserCollection(self.session)
old_user = users.by_username(old_email) if old_email else None
new_user = users.by_username(new_email) if new_email else None
create = False
enable = None
disable = []

if not new_email:
# email has been unset: disable obsolete users
disable.extend([old_user, new_user])
else:
if new_email == old_email:
# email has not changed, old_user == new_user
if not old_user:
create = True
else:
enable = old_user
else:
# email has changed: ensure user exist
if old_user and new_user:
disable.append(old_user)
enable = new_user
elif not old_user and not new_user:
create = True
else:
enable = old_user if old_user else new_user

if create:
assert new_email is not None
log.info(f'Creating user {new_email}')
users.add(
new_email, random_password(16), role='parliamentarian',
realname=item.title
)

if enable:
corrections = {
'username': new_email,
'role': 'parliamentarian',
'active': True,
'source': None,
'source_id': None
}
corrections = {
attribute: value for attribute, value in corrections.items()
if getattr(enable, attribute) != value
}
if corrections:
log.info(f'Correcting user {enable.username} to {corrections}')
for attribute, value in corrections.items():
setattr(enable, attribute, value)
enable.logout_all_sessions(self.app)

for user in disable:
if user:
log.info(f'Deactivating user {user.username}')
user.active = False
user.logout_all_sessions(self.app)
Loading