Skip to content
Draft
58 changes: 55 additions & 3 deletions pydatalab/src/pydatalab/permissions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import wraps
from hashlib import sha512
from typing import Any

from bson import ObjectId
Expand All @@ -9,19 +10,32 @@
from pydatalab.logger import LOGGER
from pydatalab.login import UserRole
from pydatalab.models.people import AccountStatus
from pydatalab.mongo import get_database
from pydatalab.mongo import flask_mongo, get_database

PUBLIC_USER_ID = ObjectId(24 * "0")


def active_users_or_get_only(func):
"""Decorator to ensure that only active user accounts can access the route,
unless it is a GET-route, in which case deactivated accounts can also access it.

Now also allows access with valid access tokens.
"""

@wraps(func)
def wrapped_route(*args, **kwargs):
access_token = request.args.get("at")
refcode = kwargs.get("refcode")

if not refcode and access_token:
path_parts = request.path.strip("/").split("/")
if len(path_parts) >= 2 and path_parts[0] == "items":
refcode = path_parts[1]

if access_token and refcode:
token_valid = check_access_token(refcode, access_token)
if token_valid:
return func(*args, **kwargs)

if (
(
current_user.is_authenticated
Expand Down Expand Up @@ -60,7 +74,37 @@ def wrapped_route(*args, **kwargs):
return wrapped_route


def get_default_permissions(user_only: bool = True, deleting: bool = False) -> dict[str, Any]:
def check_access_token(refcode: str, token: str | None = None) -> bool:
"""Check whether the provided access token exists in the get_database
and corresponds to the relevant refcode.

Returns:
Whether or not the token can read the item.

"""

if not token or not refcode:
return False

try:
if len(refcode.split(":")) != 2:
refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"

token_hash = sha512(token.encode("utf-8")).hexdigest()

access_token_doc = flask_mongo.db.api_keys.find_one(
{"token": token_hash, "refcode": refcode, "active": True, "type": "access_token"}
)

return bool(access_token_doc)

except Exception:
return False


def get_default_permissions(
user_only: bool = True, deleting: bool = False, elevate_permissions: bool = False
) -> dict[str, Any]:
"""Return the MongoDB query terms corresponding to the current user.

Will return open permissions if a) the `CONFIG.TESTING` parameter is `True`,
Expand All @@ -70,6 +114,8 @@ def get_default_permissions(user_only: bool = True, deleting: bool = False) -> d
user_only: Whether to exclude items that also have no attached user (`False`),
i.e., public items. This should be set to `False` when reading (and wanting
to return public items), but left as `True` when modifying or removing items.
elevate_permissions: Whether to elevate this query's permissions, i.e., in the case
that an item-specific access token has been provided elsewhere.

"""

Expand All @@ -84,6 +130,12 @@ def get_default_permissions(user_only: bool = True, deleting: bool = False) -> d
):
return {}

if elevate_permissions:
LOGGER.warning(
"Permissions check with elevated permissions, likely due to access token usage"
)
return {}

null_perm = {
"$or": [
{"creator_ids": {"$size": 0}},
Expand Down
83 changes: 83 additions & 0 deletions pydatalab/src/pydatalab/routes/v0_1/admin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from bson import ObjectId
from flask import Blueprint, jsonify, request
from flask_login import current_user
Expand Down Expand Up @@ -93,3 +95,84 @@ def save_role(user_id):
)

return (jsonify({"status": "success"}), 200)


@ADMIN.route("/items/<refcode>/invalidate-access-token", methods=["POST"])
def invalidate_access_token(refcode: str):
if len(refcode.split(":")) != 2:
refcode = f"{CONFIG.IDENTIFIER_PREFIX}:{refcode}"

query = {"refcode": refcode, "active": True, "type": "access_token"}

response = flask_mongo.db.api_keys.update_one(
query,
{
"$set": {
"active": False,
"invalidated_at": datetime.datetime.now(tz=datetime.timezone.utc),
"invalidated_by": ObjectId(current_user.id),
}
},
)

if response.modified_count == 1:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "error", "detail": "Token not found or already invalidated"}), 404


@ADMIN.route("/access-tokens", methods=["GET"])
def list_access_tokens():
"""List all access tokens with their status and metadata."""

pipeline = [
{"$match": {"type": "access_token"}},
{
"$lookup": {
"from": "items",
"localField": "refcode",
"foreignField": "refcode",
"as": "item_info",
}
},
{
"$lookup": {
"from": "users",
"localField": "user",
"foreignField": "_id",
"as": "user_info",
}
},
{
"$project": {
"_id": 1,
"refcode": 1,
"active": 1,
"created_at": 1,
"invalidated_at": 1,
"token": "$token",
"item_name": {
"$cond": {
"if": {"$gt": [{"$size": "$item_info"}, 0]},
"then": {"$arrayElemAt": ["$item_info.name", 0]},
"else": None,
}
},
"item_id": {"$arrayElemAt": ["$item_info.item_id", 0]},
"item_type": {
"$cond": {
"if": {"$gt": [{"$size": "$item_info"}, 0]},
"then": {"$arrayElemAt": ["$item_info.type", 0]},
"else": "deleted",
}
},
"created_by": {"$arrayElemAt": ["$user_info.display_name", 0]},
"created_by_info": {"$arrayElemAt": ["$user_info", 0]},
}
},
{"$sort": {"created_at": -1}},
]

tokens = list(flask_mongo.db.api_keys.aggregate(pipeline))

return jsonify({"status": "success", "tokens": tokens}), 200
Loading
Loading