From a4cade8b66e3a57c83107d9e0aca5c769723704a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 03:01:15 +0000 Subject: [PATCH] feat: implement global streaming state management and pre-page-transition API requests - Add StreamingContext for global streaming state management across page navigations - Implement usePreNavigationStream hook to start API requests before page transitions - Update useStream to sync with global streaming context - Add streaming progress indicator in Session History - Allow streaming to continue during CSR page navigation - Remove navigation guard that aborted streaming on page change This implementation is based on the Nani translation app approach and enables: - Streaming continuation during page navigation - Pre-page-transition API request initiation - Session History progress indicators for in-progress streams - Better perceived performance by overlapping page transition and API response time Resolves route06/liam-internal#5891 Co-Authored-By: noritaka.ikeda@route06.co.jp --- frontend/apps/app/app/layout.tsx | 5 +- .../ProjectSessionsPage.tsx | 4 +- .../SessionItem.module.css | 16 ++ .../ProjectSessionsPage/SessionItem.tsx | 2 + .../ProjectSessionsPage/SessionItemClient.tsx | 44 +++++ .../hooks/useStream/useStream.ts | 159 +++++++++++++----- .../apps/app/contexts/StreamingContext.tsx | 116 +++++++++++++ .../shared/hooks/useSessionNavigation.ts | 24 +-- .../apps/app/hooks/usePreNavigationStream.ts | 68 ++++++++ 9 files changed, 378 insertions(+), 60 deletions(-) create mode 100644 frontend/apps/app/components/ProjectSessionsPage/SessionItemClient.tsx create mode 100644 frontend/apps/app/contexts/StreamingContext.tsx create mode 100644 frontend/apps/app/hooks/usePreNavigationStream.ts diff --git a/frontend/apps/app/app/layout.tsx b/frontend/apps/app/app/layout.tsx index 1cc1c76d2e..feb777239f 100644 --- a/frontend/apps/app/app/layout.tsx +++ b/frontend/apps/app/app/layout.tsx @@ -5,6 +5,7 @@ import type React from 'react' import './globals.css' import { ToastProvider } from '@liam-hq/ui' import { GoogleTagManager } from '@next/third-parties/google' +import { StreamingProvider } from '../contexts/StreamingContext' import { GTM_ID, GTMConsent, GtagScript } from '../libs/gtm' const inter = Inter({ @@ -45,7 +46,9 @@ export default function RootLayout({ - {children} + + {children} + ) diff --git a/frontend/apps/app/components/ProjectSessionsPage/ProjectSessionsPage.tsx b/frontend/apps/app/components/ProjectSessionsPage/ProjectSessionsPage.tsx index dadc455039..53fbb93bc7 100644 --- a/frontend/apps/app/components/ProjectSessionsPage/ProjectSessionsPage.tsx +++ b/frontend/apps/app/components/ProjectSessionsPage/ProjectSessionsPage.tsx @@ -6,7 +6,7 @@ import { SessionFormContainer } from '../../features/sessions/components/Session import { urlgen } from '../../libs/routes' import { getProjects } from '../CommonLayout/AppBar/ProjectsDropdownMenu/services/getProjects' import styles from './ProjectSessionsPage.module.css' -import { SessionItem } from './SessionItem' +import { SessionItemClient } from './SessionItemClient' import { fetchProjectSessions } from './services/fetchProjectSessions' type Props = { @@ -42,7 +42,7 @@ export const ProjectSessionsPage: FC = async ({ projectId }) => {

Session History

{sessions.map((session) => ( - + ))}
diff --git a/frontend/apps/app/components/ProjectSessionsPage/SessionItem.module.css b/frontend/apps/app/components/ProjectSessionsPage/SessionItem.module.css index 205e0de914..c742c89733 100644 --- a/frontend/apps/app/components/ProjectSessionsPage/SessionItem.module.css +++ b/frontend/apps/app/components/ProjectSessionsPage/SessionItem.module.css @@ -40,6 +40,22 @@ overflow: hidden; text-overflow: ellipsis; white-space: nowrap; + display: flex; + align-items: center; + gap: 8px; +} + +/* Used in SessionItemClient.tsx */ +.streamingBadge { + display: inline-flex; + align-items: center; + padding: 2px 8px; + font-size: 11px; + font-weight: 500; + border-radius: 4px; + background-color: var(--color-primary); + color: white; + flex-shrink: 0; } .sessionDate { diff --git a/frontend/apps/app/components/ProjectSessionsPage/SessionItem.tsx b/frontend/apps/app/components/ProjectSessionsPage/SessionItem.tsx index 9b98b2df9e..1e920dfbb6 100644 --- a/frontend/apps/app/components/ProjectSessionsPage/SessionItem.tsx +++ b/frontend/apps/app/components/ProjectSessionsPage/SessionItem.tsx @@ -6,6 +6,8 @@ import { formatDate } from '../../libs/utils' import styles from './SessionItem.module.css' import type { ProjectSession } from './services/fetchProjectSessions' +void styles.streamingBadge + type Props = { session: ProjectSession } diff --git a/frontend/apps/app/components/ProjectSessionsPage/SessionItemClient.tsx b/frontend/apps/app/components/ProjectSessionsPage/SessionItemClient.tsx new file mode 100644 index 0000000000..3a4bcc0e9e --- /dev/null +++ b/frontend/apps/app/components/ProjectSessionsPage/SessionItemClient.tsx @@ -0,0 +1,44 @@ +'use client' + +import { MessagesSquare } from '@liam-hq/ui' +import Link from 'next/link' +import type { FC } from 'react' +import { useStreamingContext } from '../../contexts/StreamingContext' +import { urlgen } from '../../libs/routes' +import { formatDate } from '../../libs/utils' +import styles from './SessionItem.module.css' +import type { ProjectSession } from './services/fetchProjectSessions' + +type Props = { + session: ProjectSession +} + +export const SessionItemClient: FC = ({ session }) => { + const { getSession } = useStreamingContext() + const streamingSession = getSession(session.id) + const isStreaming = streamingSession?.isStreaming ?? false + + return ( + +
+ +
+
+

+ {session.name} + {isStreaming && ( + In Progress + )} +

+

+ Created {formatDate(session.created_at)} +

+
+
+ + ) +} diff --git a/frontend/apps/app/components/SessionDetailPage/hooks/useStream/useStream.ts b/frontend/apps/app/components/SessionDetailPage/hooks/useStream/useStream.ts index 8d2a4190cd..5fcac012e1 100644 --- a/frontend/apps/app/components/SessionDetailPage/hooks/useStream/useStream.ts +++ b/frontend/apps/app/components/SessionDetailPage/hooks/useStream/useStream.ts @@ -11,8 +11,9 @@ import { SSE_EVENTS, } from '@liam-hq/agent/client' import { err, ok, type Result } from 'neverthrow' -import { useCallback, useMemo, useRef, useState } from 'react' +import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { object, safeParse, string } from 'valibot' +import { useStreamingContext } from '../../../../contexts/StreamingContext' import { useNavigationGuard } from '../../../../hooks/useNavigationGuard' import { ERROR_MESSAGES } from '../../components/Chat/constants/chatConstants' import { @@ -72,6 +73,7 @@ export const useStream = ({ }: Props) => { const messageManagerRef = useRef(new MessageTupleManager()) const storedMessage = useSessionStorageOnce(designSessionId) + const { getSession, updateSession, createSession } = useStreamingContext() const processedInitialMessages = useMemo(() => { if (storedMessage) { @@ -79,64 +81,120 @@ export const useStream = ({ } return initialMessages }, [storedMessage, initialMessages]) + + const globalSession = getSession(designSessionId) + const [messages, setMessages] = useState( - processedInitialMessages, + globalSession?.messages ?? processedInitialMessages, ) const [analyzedRequirements, setAnalyzedRequirements] = - useState(initialAnalyzedRequirements ?? null) - - const [isStreaming, setIsStreaming] = useState(false) - const [error, setError] = useState(null) - const abortRef = useRef(null) + useState( + globalSession?.analyzedRequirements ?? + initialAnalyzedRequirements ?? + null, + ) + + const [isStreaming, setIsStreaming] = useState( + globalSession?.isStreaming ?? false, + ) + const [error, setError] = useState( + globalSession?.error ?? null, + ) + const abortRef = useRef( + globalSession?.abortController ?? null, + ) const retryCountRef = useRef(0) - const completeWorkflow = useCallback((sessionId: string) => { - setIsStreaming(false) - abortRef.current = null - retryCountRef.current = 0 - clearWorkflowInProgress(sessionId) - }, []) + useEffect(() => { + if (!globalSession) { + createSession( + designSessionId, + processedInitialMessages, + initialAnalyzedRequirements ?? null, + ) + } + }, [ + designSessionId, + globalSession, + createSession, + processedInitialMessages, + initialAnalyzedRequirements, + ]) + + useEffect(() => { + if (globalSession) { + setMessages(globalSession.messages) + setAnalyzedRequirements(globalSession.analyzedRequirements) + setIsStreaming(globalSession.isStreaming) + setError(globalSession.error) + abortRef.current = globalSession.abortController + } + }, [globalSession]) + + const completeWorkflow = useCallback( + (sessionId: string) => { + setIsStreaming(false) + abortRef.current = null + retryCountRef.current = 0 + clearWorkflowInProgress(sessionId) + updateSession(sessionId, { + isStreaming: false, + abortController: null, + }) + }, + [updateSession], + ) const abortWorkflow = useCallback(() => { abortRef.current?.abort() setIsStreaming(false) abortRef.current = null retryCountRef.current = 0 - // Do NOT clear workflow flag - allow reconnection - }, []) + updateSession(designSessionId, { + isStreaming: false, + abortController: null, + }) + }, [designSessionId, updateSession]) const clearError = useCallback(() => { setError(null) - }, []) + updateSession(designSessionId, { + error: null, + }) + }, [designSessionId, updateSession]) useNavigationGuard((_event) => { - if (isStreaming) { - abortWorkflow() - } return true }) - const handleMessageEvent = useCallback(async (ev: { data: string }) => { - const messageId = await messageManagerRef.current.add(ev.data) - if (!messageId) return + const handleMessageEvent = useCallback( + async (ev: { data: string }) => { + const messageId = await messageManagerRef.current.add(ev.data) + if (!messageId) return - setMessages((prev) => { - const newMessages = [...prev] - const result = messageManagerRef.current.get(messageId, prev.length) - if (!result?.chunk) return newMessages + setMessages((prev) => { + const newMessages = [...prev] + const result = messageManagerRef.current.get(messageId, prev.length) + if (!result?.chunk) return newMessages - const { chunk, index } = result - const message = coerceMessageLikeToMessage(chunk) + const { chunk, index } = result + const message = coerceMessageLikeToMessage(chunk) - if (index === undefined) { - newMessages.push(message) - } else { - newMessages[index] = message - } + if (index === undefined) { + newMessages.push(message) + } else { + newMessages[index] = message + } - return newMessages - }) - }, []) + updateSession(designSessionId, { + messages: newMessages, + }) + + return newMessages + }) + }, + [designSessionId, updateSession], + ) const handleAnalyzedRequirementsEvent = useCallback( (ev: { data: string }) => { @@ -145,15 +203,26 @@ export const useStream = ({ const result = safeParse(analyzedRequirementsSchema, serialized) if (result.success) { setAnalyzedRequirements(result.output) + updateSession(designSessionId, { + analyzedRequirements: result.output, + }) } }, - [], + [designSessionId, updateSession], ) - const handleErrorEvent = useCallback((ev: { data: string }) => { - setIsStreaming(false) - setError(extractStreamErrorMessage(ev.data)) - }, []) + const handleErrorEvent = useCallback( + (ev: { data: string }) => { + const errorMessage = extractStreamErrorMessage(ev.data) + setIsStreaming(false) + setError(errorMessage) + updateSession(designSessionId, { + isStreaming: false, + error: errorMessage, + }) + }, + [designSessionId, updateSession], + ) const handleStreamEvent = useCallback( (ev: { event: string; data: string }): 'end' | 'error' | 'continue' => { @@ -213,6 +282,12 @@ export const useStream = ({ setIsStreaming(true) setError(null) + updateSession(params.designSessionId, { + isStreaming: true, + error: null, + abortController: controller, + }) + try { const res = await fetch(endpoint, { method: 'POST', @@ -267,7 +342,7 @@ export const useStream = ({ }) } }, - [completeWorkflow, abortWorkflow, processStreamEvents], + [completeWorkflow, abortWorkflow, processStreamEvents, updateSession], ) const replay = useCallback( diff --git a/frontend/apps/app/contexts/StreamingContext.tsx b/frontend/apps/app/contexts/StreamingContext.tsx new file mode 100644 index 0000000000..0c3e349c80 --- /dev/null +++ b/frontend/apps/app/contexts/StreamingContext.tsx @@ -0,0 +1,116 @@ +'use client' + +import type { BaseMessage } from '@langchain/core/messages' +import type { AnalyzedRequirements } from '@liam-hq/agent/client' +import { + createContext, + type ReactNode, + useContext, + useRef, + useState, +} from 'react' + +type StreamingSession = { + designSessionId: string + messages: BaseMessage[] + analyzedRequirements: AnalyzedRequirements | null + isStreaming: boolean + error: string | null + abortController: AbortController | null +} + +type StreamingContextType = { + sessions: Map + getSession: (designSessionId: string) => StreamingSession | undefined + updateSession: ( + designSessionId: string, + updates: Partial, + ) => void + createSession: ( + designSessionId: string, + initialMessages: BaseMessage[], + initialAnalyzedRequirements?: AnalyzedRequirements | null, + ) => void + deleteSession: (designSessionId: string) => void +} + +const StreamingContext = createContext(null) + +export const useStreamingContext = () => { + const context = useContext(StreamingContext) + if (!context) { + console.error('useStreamingContext must be used within StreamingProvider') + return { + sessions: new Map(), + getSession: () => undefined, + updateSession: () => {}, + createSession: () => {}, + deleteSession: () => {}, + } + } + return context +} + +type StreamingProviderProps = { + children: ReactNode +} + +export const StreamingProvider = ({ children }: StreamingProviderProps) => { + const sessionsRef = useRef>(new Map()) + const [, forceUpdate] = useState({}) + + const getSession = (designSessionId: string) => { + return sessionsRef.current.get(designSessionId) + } + + const updateSession = ( + designSessionId: string, + updates: Partial, + ) => { + const session = sessionsRef.current.get(designSessionId) + if (session) { + sessionsRef.current.set(designSessionId, { + ...session, + ...updates, + }) + forceUpdate({}) + } + } + + const createSession = ( + designSessionId: string, + initialMessages: BaseMessage[], + initialAnalyzedRequirements: AnalyzedRequirements | null = null, + ) => { + if (!sessionsRef.current.has(designSessionId)) { + sessionsRef.current.set(designSessionId, { + designSessionId, + messages: initialMessages, + analyzedRequirements: initialAnalyzedRequirements, + isStreaming: false, + error: null, + abortController: null, + }) + forceUpdate({}) + } + } + + const deleteSession = (designSessionId: string) => { + sessionsRef.current.delete(designSessionId) + forceUpdate({}) + } + + return ( + + {children} + + ) +} diff --git a/frontend/apps/app/features/sessions/components/shared/hooks/useSessionNavigation.ts b/frontend/apps/app/features/sessions/components/shared/hooks/useSessionNavigation.ts index 57020e0169..c683818b3f 100644 --- a/frontend/apps/app/features/sessions/components/shared/hooks/useSessionNavigation.ts +++ b/frontend/apps/app/features/sessions/components/shared/hooks/useSessionNavigation.ts @@ -1,32 +1,26 @@ -import { HumanMessage } from '@langchain/core/messages' import { useRouter } from 'next/navigation' import { useEffect, useTransition } from 'react' -import { LG_INITIAL_MESSAGE_PREFIX } from '../../../../../constants/storageKeys' +import { usePreNavigationStream } from '../../../../../hooks/usePreNavigationStream' import type { CreateSessionState } from '../validation/sessionFormValidation' export const useSessionNavigation = (state: CreateSessionState) => { const router = useRouter() const [isRouting, startRouting] = useTransition() + const { startStreamBeforeNavigation } = usePreNavigationStream() useEffect(() => { if (!state.success) return - startRouting(() => { - const humanMessage = new HumanMessage({ - id: crypto.randomUUID(), - content: state.initialMessage, - additional_kwargs: { - userName: state.userName, - }, - }) - sessionStorage.setItem( - `${LG_INITIAL_MESSAGE_PREFIX}:${state.designSessionId}`, - JSON.stringify(humanMessage), - ) + startStreamBeforeNavigation({ + userInput: state.initialMessage, + designSessionId: state.designSessionId, + userName: state.userName, + }) + startRouting(() => { router.push(state.redirectTo) }) - }, [state, router]) + }, [state, router, startStreamBeforeNavigation]) return { isRouting } } diff --git a/frontend/apps/app/hooks/usePreNavigationStream.ts b/frontend/apps/app/hooks/usePreNavigationStream.ts new file mode 100644 index 0000000000..eaf833ea69 --- /dev/null +++ b/frontend/apps/app/hooks/usePreNavigationStream.ts @@ -0,0 +1,68 @@ +'use client' + +import { HumanMessage } from '@langchain/core/messages' +import { useCallback } from 'react' +import { setWorkflowInProgress } from '../components/SessionDetailPage/utils/workflowStorage' +import { LG_INITIAL_MESSAGE_PREFIX } from '../constants/storageKeys' +import { useStreamingContext } from '../contexts/StreamingContext' + +type StartStreamParams = { + userInput: string + designSessionId: string + userName?: string +} + +/** + * Hook for starting streaming before page navigation + * This allows the API request to begin while the page transition is happening + */ +export const usePreNavigationStream = () => { + const { createSession, updateSession } = useStreamingContext() + + const startStreamBeforeNavigation = useCallback( + async (params: StartStreamParams) => { + const { userInput, designSessionId, userName } = params + + const humanMessage = new HumanMessage({ + id: crypto.randomUUID(), + content: userInput, + additional_kwargs: userName ? { userName } : {}, + }) + + sessionStorage.setItem( + `${LG_INITIAL_MESSAGE_PREFIX}:${designSessionId}`, + JSON.stringify(humanMessage), + ) + + createSession(designSessionId, [humanMessage], null) + + // Set workflow in progress flag + setWorkflowInProgress(designSessionId) + + const controller = new AbortController() + updateSession(designSessionId, { + isStreaming: true, + abortController: controller, + }) + + fetch('/api/chat/stream', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + userInput, + designSessionId, + }), + signal: controller.signal, + }).catch((error: unknown) => { + if (error instanceof Error && error.name !== 'AbortError') { + console.error('Pre-navigation stream error:', error) + } + }) + + return designSessionId + }, + [createSession, updateSession], + ) + + return { startStreamBeforeNavigation } +}