-
Notifications
You must be signed in to change notification settings - Fork 52
update store transcript function #447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Stephanie <[email protected]>
WalkthroughPer-conversation transcript storage replaced per-turn files. The storage path constructor now takes only user_id. A single JSON file per conversation is updated with turns under file lock via fcntl. Data schema adds conversation_metadata and detailed turn records. Tests updated to reflect API/signature changes, schema, and locking behavior. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as store_transcript
participant FS as FileSystem
participant Lock as fcntl
Caller->>FS: ensure transcripts dir exists (per user)
Caller->>FS: open "<conversation_id>.json" (a+)
Caller->>Lock: flock(fd, LOCK_EX)
Note over Caller,FS: Critical section (serialized updates)
Caller->>FS: read existing JSON (if any)
alt file empty/invalid JSON
Caller->>Caller: init conversation_data
else valid JSON
Caller->>Caller: load and append new turn
end
Caller->>FS: seek(0), write updated JSON (indent=2), truncate remainder
Caller->>Lock: flock(fd, LOCK_UN)
FS-->>Caller: close file
Note right of Caller: last_updated refreshed
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (7)
src/utils/transcripts.py (4)
21-30: Ensure base storage path is absolute and has a sane defaultIf transcripts_storage is unset, Path(file_path, uid) will write relative to CWD. Provide a safe absolute default (e.g., tempfile.gettempdir()/transcripts) to avoid surprises in different runtimes.
Apply:
@@ -from configuration import configuration +from configuration import configuration +import tempfile @@ def construct_transcripts_path(user_id: str) -> Path: - file_path = ( - configuration.user_data_collection_configuration.transcripts_storage or "" - ) - return Path(file_path, uid) + configured = configuration.user_data_collection_configuration.transcripts_storage + base_dir = Path(configured) if configured else Path(tempfile.gettempdir()) / "transcripts" + return base_dir / uid
59-61: Docstring/type mismatch for rag_chunksDocstring says “list of RagChunk objects” but the type is list[str] and the code serializes strings. Align the docstring (or change the type if RagChunk is the intended structure).
Apply:
- rag_chunks: The list of `RagChunk` objects. + rag_chunks: List of strings containing retrieved chunk excerpts (RAG).
69-85: Minor: use a single timestamp to keep created_at/last_updated/turn timestamp coherentUsing three datetime.now calls can produce microsecond drifts. Capture once and reuse for the first turn.
Example:
- turn_data = { + now_iso = datetime.now(UTC).isoformat() + turn_data = { @@ - "timestamp": datetime.now(UTC).isoformat(), + "timestamp": now_iso, }, @@ - "created_at": datetime.now(UTC).isoformat(), - "last_updated": datetime.now(UTC).isoformat(), + "created_at": now_iso, + "last_updated": now_iso,
8-8: fcntl is POSIX-only — confirm platform constraints or add fallbackfcntl.flock won’t work on Windows. If Windows support isn’t required, document this; else add a conditional import and fallback (e.g., msvcrt or portalocker).
Sketch:
- import fcntl + try: + import fcntl # POSIX + except Exception: # pragma: no cover + fcntl = NoneThen guard calls or use a third-party cross-platform locker if acceptable.
tests/unit/utils/test_transcripts.py (3)
54-57: Mocking open with empty content is sufficient; no need to force JSONDecodeError for first writeGiven read_data="" the code path won’t call json.load. The JSONDecodeError side_effect below is unused and can be removed to keep the test intention crisp.
Apply:
- mock_json = mocker.patch("utils.transcripts.json") - mock_json.load.side_effect = json.JSONDecodeError("No JSON object", "", 0) + mock_json = mocker.patch("utils.transcripts.json") + # First write starts from an empty file; json.load should not be called. + mock_json.load.assert_not_called if hasattr(mock_json.load, "__call__") else None
123-158: Strengthen expectations with append scenario and created_at stabilityAdd a test that writes two turns and verifies:
- turns length increments to 2
- created_at remains constant, last_updated changes
- second turn metadata.timestamp > first
Example test to add:
def test_store_transcript_appends_turns_and_updates_last_updated(tmp_path, mocker): # Arrange: real path under tmp cfg = AppConfig() cfg.init_from_dict({ "name": "test", "service": {"host": "localhost", "port": 8080, "auth_enabled": False, "workers": 1, "color_log": True, "access_log": True}, "llama_stack": {"api_key": "x", "url": "http://x", "use_as_library_client": False}, "user_data_collection": {"transcripts_storage": str(tmp_path)}, }) mocker.patch("utils.transcripts.configuration", cfg) # Use real fcntl; mock only time to control timestamps user_id = "user123" conversation_id = "c5260aec-4d82-4370-9fdf-05cf908b3f16" path = construct_transcripts_path(user_id) q = QueryRequest(query="hi", model="m", provider="p", conversation_id=conversation_id) s1 = TurnSummary(llm_response="a1", tool_calls=[]) s2 = TurnSummary(llm_response="a2", tool_calls=[]) store_transcript(user_id, conversation_id, "m", "p", True, "q1", q, s1, [], False, []) store_transcript(user_id, conversation_id, "m", "p", True, "q2", q, s2, [], False, []) with open(path / f"{conversation_id}.json", "r", encoding="utf-8") as f: data = json.load(f) assert len(data["turns"]) == 2 assert data["conversation_metadata"]["created_at"] <= data["conversation_metadata"]["last_updated"] assert data["turns"][0]["metadata"]["timestamp"] <= data["turns"][1]["metadata"]["timestamp"]
70-81: Consider adding a negative test for conversation_id traversalOnce the sanitize/validate fix is in, add a test asserting store_transcript raises for conversation_id like "../escape".
Example:
import pytest def test_store_transcript_rejects_traversal_cid(mocker): mocker.patch("utils.transcripts.construct_transcripts_path", return_value=mocker.MagicMock()) with pytest.raises(ValueError): store_transcript("user123", "../evil", "m", "p", True, "q", QueryRequest(query="q", conversation_id="../evil"), TurnSummary(llm_response="", tool_calls=[]), [], False, [])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/utils/transcripts.py(4 hunks)tests/unit/utils/test_transcripts.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unit/utils/test_transcripts.py (2)
src/utils/transcripts.py (1)
construct_transcripts_path(21-29)src/models/requests.py (1)
QueryRequest(71-221)
src/utils/transcripts.py (3)
src/configuration.py (2)
configuration(55-60)user_data_collection_configuration(79-84)src/models/requests.py (2)
Attachment(15-68)QueryRequest(71-221)src/utils/types.py (1)
TurnSummary(59-78)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build-pr
- GitHub Check: e2e_tests
🔇 Additional comments (3)
src/utils/transcripts.py (1)
121-121: LGTM: clear audit log on successful writeThe info log with the final path is helpful for ops and debugging.
tests/unit/utils/test_transcripts.py (2)
44-48: LGTM: construct_transcripts_path expectation matches the new per-user directory layout
118-121: LGTM: verifies lock acquire/release orderingGood assertion of LOCK_EX and LOCK_UN usage.
| # Use conversation_id as filename instead of random UUID | ||
| transcript_file_path = transcripts_path / f"{conversation_id}.json" | ||
| # Prepare turn data | ||
| turn_data = { | ||
| "metadata": { | ||
| "provider": provider_id, | ||
| "model": model_id, | ||
| "query_provider": query_request.provider, | ||
| "query_model": query_request.model, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Path traversal via conversation_id filename — sanitize or validate before using
conversation_id is interpolated directly into the filename. Inputs like "../x" or "foo/../../bar" would escape the user directory and write elsewhere. Sanitize like user_id or enforce a strict format (UUID/SUID).
Apply:
@@
- # Use conversation_id as filename instead of random UUID
- transcript_file_path = transcripts_path / f"{conversation_id}.json"
+ # Use conversation_id as filename instead of random UUID (sanitize to prevent traversal)
+ cid = os.path.normpath("/" + conversation_id).lstrip("/")
+ # reject if normalization changed it or if it contains any separators/specials
+ if cid != conversation_id or os.path.sep in cid or (os.path.altsep and os.path.altsep in cid) or cid in ("", ".", ".."):
+ raise ValueError(f"Invalid conversation_id '{conversation_id}'")
+ transcript_file_path = transcripts_path / f"{cid}.json"Optionally, if a SUID/UUID utility exists, prefer a strict validator instead of path heuristics.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/utils/transcripts.py around lines 66-74, conversation_id is directly
interpolated into a filename which allows path traversal (e.g. "../x" or
"foo/../../bar"); validate or sanitize it before use by enforcing a strict
format (preferably UUID/SUID) or using an existing UUID/SUID validator utility
if available, otherwise reject any value containing path separators or
suspicious characters and/or reduce to os.path.basename, and if validation fails
raise/return an error or fallback to generating a safe UUID filename; ensure the
sanitized/validated id is used to build transcripts_path /
f"{safe_conversation_id}.json".
| # Use file locking to handle concurrent writes safely | ||
| with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file: | ||
| fcntl.flock(transcript_file.fileno(), fcntl.LOCK_EX) | ||
| try: | ||
| # Move to beginning to read existing content | ||
| transcript_file.seek(0) | ||
| file_content = transcript_file.read() | ||
| if file_content.strip(): | ||
| # File has existing content, load it | ||
| transcript_file.seek(0) | ||
| conversation_data = json.load(transcript_file) | ||
| else: | ||
| # First turn for this conversation | ||
| conversation_data = { | ||
| "conversation_metadata": { | ||
| "conversation_id": conversation_id, | ||
| "user_id": user_id, | ||
| "created_at": datetime.now(UTC).isoformat(), | ||
| "last_updated": datetime.now(UTC).isoformat(), | ||
| }, | ||
| "turns": [], | ||
| } | ||
| # Add new turn | ||
| conversation_data["turns"].append(turn_data) | ||
| conversation_data["conversation_metadata"]["last_updated"] = datetime.now( | ||
| UTC | ||
| ).isoformat() | ||
|
|
||
| # Write updated data back to file | ||
| transcript_file.seek(0) | ||
| transcript_file.truncate() | ||
| json.dump(conversation_data, transcript_file, indent=2) | ||
| finally: | ||
| fcntl.flock(transcript_file.fileno(), fcntl.LOCK_UN) | ||
|
|
||
| logger.info("Transcript successfully stored at: %s", transcript_file_path) | ||
| logger.info("Transcript turn successfully stored at: %s", transcript_file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Harden against corrupted/partial JSON and improve crash-safety
- If file_content is non-empty but corrupted (e.g., previous crash mid-write), json.load will raise and the write path fails. Catch JSONDecodeError and recover.
- Current write truncates the file in-place. A crash between truncate() and dump() can leave an empty or partial file. Prefer atomic write (tempfile + os.replace) while holding a lock.
Apply minimal recovery + atomic write:
@@
- with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file:
+ with open(transcript_file_path, "a+", encoding="utf-8") as transcript_file:
fcntl.flock(transcript_file.fileno(), fcntl.LOCK_EX)
try:
# Move to beginning to read existing content
transcript_file.seek(0)
file_content = transcript_file.read()
- if file_content.strip():
- # File has existing content, load it
- transcript_file.seek(0)
- conversation_data = json.load(transcript_file)
+ if file_content.strip():
+ # File has existing content, load it (recover if corrupted)
+ transcript_file.seek(0)
+ try:
+ conversation_data = json.load(transcript_file)
+ except json.JSONDecodeError:
+ logger.warning("Corrupted transcript detected at %s; backing up and reinitializing.", transcript_file_path)
+ backup_path = transcript_file_path.with_suffix(
+ transcript_file_path.suffix + f".corrupt-{datetime.now(UTC).strftime('%Y%m%dT%H%M%S%fZ')}"
+ )
+ try:
+ # Use the content we already read for backup
+ with open(backup_path, "w", encoding="utf-8") as backup:
+ backup.write(file_content)
+ except Exception: # best-effort
+ logger.exception("Failed to write backup for corrupted transcript: %s", backup_path)
+ conversation_data = {
+ "conversation_metadata": {
+ "conversation_id": conversation_id,
+ "user_id": user_id,
+ "created_at": datetime.now(UTC).isoformat(),
+ "last_updated": datetime.now(UTC).isoformat(),
+ },
+ "turns": [],
+ }
else:
# First turn for this conversation
conversation_data = {
@@
- # Write updated data back to file
- transcript_file.seek(0)
- transcript_file.truncate()
- json.dump(conversation_data, transcript_file, indent=2)
+ # Write updated data back to file atomically
+ import tempfile
+ with tempfile.NamedTemporaryFile(
+ "w", dir=transcripts_path, prefix=f".{cid}.", suffix=".tmp", delete=False, encoding="utf-8"
+ ) as tmp:
+ json.dump(conversation_data, tmp, indent=2, ensure_ascii=False)
+ tmp.flush()
+ os.fsync(tmp.fileno())
+ tmp_path = Path(tmp.name)
+ os.replace(tmp_path, transcript_file_path)
finally:
fcntl.flock(transcript_file.fileno(), fcntl.LOCK_UN)Note: We keep the lock on the original file descriptor during the prepare/replace window; if readers exist, consider enforcing shared locks for reads or a separate lock file for stricter semantics. If you don’t want atomic replace, at minimum keep the JSONDecodeError recovery.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/utils/transcripts.py around lines 86 to 121, the current logic assumes
existing JSON is valid and truncates the file in-place before writing, which can
lead to crashes producing empty/corrupted files; modify it to (1) catch
json.JSONDecodeError when loading existing content and treat as an empty/new
conversation (log a warning and recreate conversation_data), and (2) perform an
atomic write: serialize conversation_data to a temporary file in the same
directory (use tempfile.NamedTemporaryFile(delete=False) or similar), fsync the
temp file, then os.replace the temp file over the original while still holding
the fcntl lock on the original file descriptor; ensure you remove the temp file
on errors and release the lock in finally.
|
cc: @keitwb |
|
@onmete and I have been working the last few weeks to get transcripts and feedback data into the dataverse which would eliminate the usability concerns you have around how piecemeal the transcript data is. The fragmented nature of it is convenient for the exporter as we don't have to wait for the end of a conversation or mutate files to dump transcript data. We don't have the dataverse setup end to end yet, but the export process that gets the transcript/feedback data to an s3 bucket through the API ingress service is working. Currently the pipeline to get from the s3 buckets to dataverse (snowflake) is not finished yet but it is dependent upon the existing schema of transcript data. If you are trying to access transcript data in a regular deployment of lightspeed the dataverse is the way you want to do it; @onmete and I can help you get onboarded. If it is just for local dev debugging work, I would suggest just making a simple tool to aggregate the files. |
tisnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but would like to have ack from @onmete too (as he is responsible for processing transcripts).
@keitwb @onmete The reason why I made this change to transcript is due to the current llama-stack session data is missing model & provider information, to reach full feature parity with road-core. And for Developer Hub Lightspeed, which is targeting for migrate from RCS to LCS, we need those model & provider information in chat history. And given till 0.3.0, LCS rely on llama-stack session data to get/store chat history, I was thinking if feasible to temporarily get those information from transcript. Just trying to understand the purpose of use on storing transcript into dataverse. This would probably a question to @tisnik as well, how transcript is different from chat history? If LCORE-298 is on the plan, and #347 also implemented the db layer for conversations listing, can we expect a db layer is also going to be there for chat history? all metadata from a conversation can be stored as part of the chat history, and why we want to keep another copy of the transcript in dataverse? |
|
Also would like to know, from LCS perspective, what is the current best practice to fully reach feature parity and unblock our Developer Hub Lightspeed process? and also not going to affect any of your plans on transcript or chat history? |
|
closing this PR as discussed in https://redhat-internal.slack.com/archives/C07MC7G9T8A/p1756913828393219 |
Description
the old way how LCS stores transcripts, each conversation turn got stored in a separate json file with a random uuid, is hard to determine the order or match with turns from session data
update the store transcript function to have all conversation turns for a single conversation_id stored under the same file.
tmp_storage/{user_id}/{conversation_id}.json. It will be much more easier to locate the transcript file and also easier to extract particular information from the transcripts.Type of change
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features
Improvements
Refactor
Documentation