Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions src/utils/transcripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@
"""

from datetime import UTC, datetime
import fcntl
import json
import logging
import os
from pathlib import Path

from configuration import configuration
from models.requests import Attachment, QueryRequest
from utils.suid import get_suid
from utils.types import TurnSummary

logger = logging.getLogger("utils.transcripts")


def construct_transcripts_path(user_id: str, conversation_id: str) -> Path:
def construct_transcripts_path(user_id: str) -> Path:
"""Construct path to transcripts."""
# these two normalizations are required by Snyk as it detects
# this Path sanitization pattern
uid = os.path.normpath("/" + user_id).lstrip("/")
cid = os.path.normpath("/" + conversation_id).lstrip("/")
file_path = (
configuration.user_data_collection_configuration.transcripts_storage or ""
)
return Path(file_path, uid, cid)
return Path(file_path, uid)


def store_transcript( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
Expand All @@ -45,9 +44,14 @@ def store_transcript( # pylint: disable=too-many-arguments,too-many-positional-
) -> None:
"""Store transcript in the local filesystem.

All turns for a single conversation are stored in the same file,
named after the conversation_id.

Args:
user_id: The user ID (UUID).
conversation_id: The conversation ID (UUID).
model_id: The model ID.
provider_id: The provider ID.
query_is_valid: The result of the query validation.
query: The query (without attachments).
query_request: The request containing a query.
Expand All @@ -56,17 +60,18 @@ def store_transcript( # pylint: disable=too-many-arguments,too-many-positional-
truncated: The flag indicating if the history was truncated.
attachments: The list of `Attachment` objects.
"""
transcripts_path = construct_transcripts_path(user_id, conversation_id)
transcripts_path = construct_transcripts_path(user_id)
transcripts_path.mkdir(parents=True, exist_ok=True)

data_to_store = {
# Use conversation_id as filename instead of random UUID
transcript_file_path = transcripts_path / f"{conversation_id}.json"
# Prepare turn data
turn_data = {
"metadata": {
"provider": provider_id,
"model": model_id,
"query_provider": query_request.provider,
"query_model": query_request.model,
Comment on lines +66 to 74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Path traversal via conversation_id filename — sanitize or validate before using

conversation_id is interpolated directly into the filename. Inputs like "../x" or "foo/../../bar" would escape the user directory and write elsewhere. Sanitize like user_id or enforce a strict format (UUID/SUID).

Apply:

@@
-    # Use conversation_id as filename instead of random UUID
-    transcript_file_path = transcripts_path / f"{conversation_id}.json"
+    # Use conversation_id as filename instead of random UUID (sanitize to prevent traversal)
+    cid = os.path.normpath("/" + conversation_id).lstrip("/")
+    # reject if normalization changed it or if it contains any separators/specials
+    if cid != conversation_id or os.path.sep in cid or (os.path.altsep and os.path.altsep in cid) or cid in ("", ".", ".."):
+        raise ValueError(f"Invalid conversation_id '{conversation_id}'")
+    transcript_file_path = transcripts_path / f"{cid}.json"

Optionally, if a SUID/UUID utility exists, prefer a strict validator instead of path heuristics.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/utils/transcripts.py around lines 66-74, conversation_id is directly
interpolated into a filename which allows path traversal (e.g. "../x" or
"foo/../../bar"); validate or sanitize it before use by enforcing a strict
format (preferably UUID/SUID) or using an existing UUID/SUID validator utility
if available, otherwise reject any value containing path separators or
suspicious characters and/or reduce to os.path.basename, and if validation fails
raise/return an error or fallback to generating a safe UUID filename; ensure the
sanitized/validated id is used to build transcripts_path /
f"{safe_conversation_id}.json".

"user_id": user_id,
"conversation_id": conversation_id,
"timestamp": datetime.now(UTC).isoformat(),
},
"redacted_query": query,
Expand All @@ -78,9 +83,39 @@ def store_transcript( # pylint: disable=too-many-arguments,too-many-positional-
"tool_calls": [tc.model_dump() for tc in summary.tool_calls],
}

# stores feedback in a file under unique uuid
transcript_file_path = transcripts_path / f"{get_suid()}.json"
with open(transcript_file_path, "w", encoding="utf-8") as transcript_file:
json.dump(data_to_store, transcript_file)
# Use file locking to handle concurrent writes safely
with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file:
fcntl.flock(transcript_file.fileno(), fcntl.LOCK_EX)
try:
# Move to beginning to read existing content
transcript_file.seek(0)
file_content = transcript_file.read()
if file_content.strip():
# File has existing content, load it
transcript_file.seek(0)
conversation_data = json.load(transcript_file)
else:
# First turn for this conversation
conversation_data = {
"conversation_metadata": {
"conversation_id": conversation_id,
"user_id": user_id,
"created_at": datetime.now(UTC).isoformat(),
"last_updated": datetime.now(UTC).isoformat(),
},
"turns": [],
}
# Add new turn
conversation_data["turns"].append(turn_data)
conversation_data["conversation_metadata"]["last_updated"] = datetime.now(
UTC
).isoformat()

# Write updated data back to file
transcript_file.seek(0)
transcript_file.truncate()
json.dump(conversation_data, transcript_file, indent=2)
finally:
fcntl.flock(transcript_file.fileno(), fcntl.LOCK_UN)

logger.info("Transcript successfully stored at: %s", transcript_file_path)
logger.info("Transcript turn successfully stored at: %s", transcript_file_path)
Comment on lines +86 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden against corrupted/partial JSON and improve crash-safety

  • If file_content is non-empty but corrupted (e.g., previous crash mid-write), json.load will raise and the write path fails. Catch JSONDecodeError and recover.
  • Current write truncates the file in-place. A crash between truncate() and dump() can leave an empty or partial file. Prefer atomic write (tempfile + os.replace) while holding a lock.

Apply minimal recovery + atomic write:

@@
-    with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file:
+    with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file:
         fcntl.flock(transcript_file.fileno(), fcntl.LOCK_EX)
         try:
             # Move to beginning to read existing content
             transcript_file.seek(0)
             file_content = transcript_file.read()
-            if file_content.strip():
-                # File has existing content, load it
-                transcript_file.seek(0)
-                conversation_data = json.load(transcript_file)
+            if file_content.strip():
+                # File has existing content, load it (recover if corrupted)
+                transcript_file.seek(0)
+                try:
+                    conversation_data = json.load(transcript_file)
+                except json.JSONDecodeError:
+                    logger.warning("Corrupted transcript detected at %s; backing up and reinitializing.", transcript_file_path)
+                    backup_path = transcript_file_path.with_suffix(
+                        transcript_file_path.suffix + f".corrupt-{datetime.now(UTC).strftime('%Y%m%dT%H%M%S%fZ')}"
+                    )
+                    try:
+                        # Use the content we already read for backup
+                        with open(backup_path, "w", encoding="utf-8") as backup:
+                            backup.write(file_content)
+                    except Exception:  # best-effort
+                        logger.exception("Failed to write backup for corrupted transcript: %s", backup_path)
+                    conversation_data = {
+                        "conversation_metadata": {
+                            "conversation_id": conversation_id,
+                            "user_id": user_id,
+                            "created_at": datetime.now(UTC).isoformat(),
+                            "last_updated": datetime.now(UTC).isoformat(),
+                        },
+                        "turns": [],
+                    }
             else:
                 # First turn for this conversation
                 conversation_data = {
@@
-            # Write updated data back to file
-            transcript_file.seek(0)
-            transcript_file.truncate()
-            json.dump(conversation_data, transcript_file, indent=2)
+            # Write updated data back to file atomically
+            import tempfile
+            with tempfile.NamedTemporaryFile(
+                "w", dir=transcripts_path, prefix=f".{cid}.", suffix=".tmp", delete=False, encoding="utf-8"
+            ) as tmp:
+                json.dump(conversation_data, tmp, indent=2, ensure_ascii=False)
+                tmp.flush()
+                os.fsync(tmp.fileno())
+                tmp_path = Path(tmp.name)
+            os.replace(tmp_path, transcript_file_path)
         finally:
             fcntl.flock(transcript_file.fileno(), fcntl.LOCK_UN)

Note: We keep the lock on the original file descriptor during the prepare/replace window; if readers exist, consider enforcing shared locks for reads or a separate lock file for stricter semantics. If you don’t want atomic replace, at minimum keep the JSONDecodeError recovery.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/utils/transcripts.py around lines 86 to 121, the current logic assumes
existing JSON is valid and truncates the file in-place before writing, which can
lead to crashes producing empty/corrupted files; modify it to (1) catch
json.JSONDecodeError when loading existing content and treat as an empty/new
conversation (log a warning and recreate conversation_data), and (2) perform an
atomic write: serialize conversation_data to a temporary file in the same
directory (use tempfile.NamedTemporaryFile(delete=False) or similar), fsync the
temp file, then os.replace the temp file over the original while still holding
the fcntl lock on the original file descriptor; ensure you remove the temp file
on errors and release the lock in finally.

132 changes: 81 additions & 51 deletions tests/unit/utils/test_transcripts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Unit tests for functions defined in utils.transcripts module."""

import json

from configuration import AppConfig
from models.requests import QueryRequest

Expand Down Expand Up @@ -38,34 +40,55 @@ def test_construct_transcripts_path(mocker):
mocker.patch("utils.transcripts.configuration", cfg)

user_id = "user123"
conversation_id = "123e4567-e89b-12d3-a456-426614174000"

path = construct_transcripts_path(user_id, conversation_id)
path = construct_transcripts_path(user_id)

assert (
str(path) == "/tmp/transcripts/user123/123e4567-e89b-12d3-a456-426614174000"
str(path) == "/tmp/transcripts/user123"
), "Path should be constructed correctly"


def test_store_transcript(mocker):
"""Test the store_transcript function."""

mocker.patch("builtins.open", mocker.mock_open())
# Mock file operations for new behavior
mock_file = mocker.mock_open(read_data="")
mocker.patch("builtins.open", mock_file)
mocker.patch(
"utils.transcripts.construct_transcripts_path",
return_value=mocker.MagicMock(),
)

# Mock fcntl for file locking
mock_fcntl = mocker.patch("utils.transcripts.fcntl")

# Mock the JSON to assert the data is stored correctly
mock_json = mocker.patch("utils.transcripts.json")
mock_json.load.side_effect = json.JSONDecodeError("No JSON object", "", 0)

# Mock parameters
user_id = "user123"
conversation_id = "123e4567-e89b-12d3-a456-426614174000"
query = "What is OpenStack?"
model = "fake-model"
provider = "fake-provider"
query_request = QueryRequest(query=query, model=model, provider=provider)
test_data = {
"user_id": "user123",
"conversation_id": "123e4567-e89b-12d3-a456-426614174000",
"query": "What is OpenStack?",
"model": "fake-model",
"provider": "fake-provider",
"query_is_valid": True,
"rag_chunks": [],
"truncated": False,
"attachments": [],
}

query_request = QueryRequest(
query=test_data["query"],
model=test_data["model"],
provider=test_data["provider"],
conversation_id=test_data["conversation_id"],
system_prompt=None,
attachments=None,
no_tools=False,
media_type=None,
)
summary = TurnSummary(
llm_response="LLM answer",
tool_calls=[
Expand All @@ -77,51 +100,58 @@ def test_store_transcript(mocker):
)
],
)
query_is_valid = True
rag_chunks = []
truncated = False
attachments = []

store_transcript(
user_id,
conversation_id,
model,
provider,
query_is_valid,
query,
test_data["user_id"],
test_data["conversation_id"],
test_data["model"],
test_data["provider"],
test_data["query_is_valid"],
test_data["query"],
query_request,
summary,
rag_chunks,
truncated,
attachments,
test_data["rag_chunks"],
test_data["truncated"],
test_data["attachments"],
)

# Assert that the transcript was stored correctly
mock_json.dump.assert_called_once_with(
{
"metadata": {
"provider": "fake-provider",
"model": "fake-model",
"query_provider": query_request.provider,
"query_model": query_request.model,
"user_id": user_id,
"conversation_id": conversation_id,
"timestamp": mocker.ANY,
},
"redacted_query": query,
"query_is_valid": query_is_valid,
"llm_response": summary.llm_response,
"rag_chunks": rag_chunks,
"truncated": truncated,
"attachments": attachments,
"tool_calls": [
{
"id": "123",
"name": "test-tool",
"args": "testing",
"response": "tool response",
}
],
# Assert file locking was used
mock_fcntl.flock.assert_any_call(mocker.ANY, mock_fcntl.LOCK_EX)
mock_fcntl.flock.assert_any_call(mocker.ANY, mock_fcntl.LOCK_UN)

# Assert that the transcript was stored correctly with new structure
expected_data = {
"conversation_metadata": {
"conversation_id": test_data["conversation_id"],
"user_id": test_data["user_id"],
"created_at": mocker.ANY,
"last_updated": mocker.ANY,
},
mocker.ANY,
)
"turns": [
{
"metadata": {
"provider": test_data["provider"],
"model": test_data["model"],
"query_provider": query_request.provider,
"query_model": query_request.model,
"timestamp": mocker.ANY,
},
"redacted_query": test_data["query"],
"query_is_valid": test_data["query_is_valid"],
"llm_response": summary.llm_response,
"rag_chunks": test_data["rag_chunks"],
"truncated": test_data["truncated"],
"attachments": test_data["attachments"],
"tool_calls": [
{
"id": "123",
"name": "test-tool",
"args": "testing",
"response": "tool response",
}
],
}
],
}

mock_json.dump.assert_called_once_with(expected_data, mocker.ANY, indent=2)