Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion frontend/apps/app/app/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type React from 'react'
import './globals.css'
import { ToastProvider } from '@liam-hq/ui'
import { GoogleTagManager } from '@next/third-parties/google'
import { StreamingProvider } from '../contexts/StreamingContext'
import { GTM_ID, GTMConsent, GtagScript } from '../libs/gtm'

const inter = Inter({
Expand Down Expand Up @@ -45,7 +46,9 @@ export default function RootLayout({
<GtagScript />
<GTMConsent />
<body className={clsx(inter.className, montserrat.variable)}>
<ToastProvider>{children}</ToastProvider>
<StreamingProvider>
<ToastProvider>{children}</ToastProvider>
</StreamingProvider>
</body>
</html>
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { SessionFormContainer } from '../../features/sessions/components/Session
import { urlgen } from '../../libs/routes'
import { getProjects } from '../CommonLayout/AppBar/ProjectsDropdownMenu/services/getProjects'
import styles from './ProjectSessionsPage.module.css'
import { SessionItem } from './SessionItem'
import { SessionItemClient } from './SessionItemClient'
import { fetchProjectSessions } from './services/fetchProjectSessions'

type Props = {
Expand Down Expand Up @@ -42,7 +42,7 @@ export const ProjectSessionsPage: FC<Props> = async ({ projectId }) => {
<h2 className={styles.recentsTitle}>Session History</h2>
<div className={styles.sessionsList}>
{sessions.map((session) => (
<SessionItem key={session.id} session={session} />
<SessionItemClient key={session.id} session={session} />
))}
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
display: flex;
align-items: center;
gap: 8px;
}

/* Used in SessionItemClient.tsx */
.streamingBadge {
display: inline-flex;
align-items: center;
padding: 2px 8px;
font-size: 11px;
font-weight: 500;
border-radius: 4px;
background-color: var(--color-primary);
color: white;
flex-shrink: 0;
}

.sessionDate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { formatDate } from '../../libs/utils'
import styles from './SessionItem.module.css'
import type { ProjectSession } from './services/fetchProjectSessions'

void styles.streamingBadge

type Props = {
session: ProjectSession
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use client'

import { MessagesSquare } from '@liam-hq/ui'
import Link from 'next/link'
import type { FC } from 'react'
import { useStreamingContext } from '../../contexts/StreamingContext'
import { urlgen } from '../../libs/routes'
import { formatDate } from '../../libs/utils'
import styles from './SessionItem.module.css'
import type { ProjectSession } from './services/fetchProjectSessions'

type Props = {
session: ProjectSession
}

export const SessionItemClient: FC<Props> = ({ session }) => {
const { getSession } = useStreamingContext()
const streamingSession = getSession(session.id)
const isStreaming = streamingSession?.isStreaming ?? false

return (
<Link
href={urlgen('design_sessions/[id]', { id: session.id })}
className={styles.sessionItem}
data-streaming={isStreaming}
>
<div className={styles.iconContainer}>
<MessagesSquare size={20} />
</div>
<div className={styles.content}>
<h4 className={styles.sessionName}>
{session.name}
{isStreaming && (
<span className={styles.streamingBadge}>In Progress</span>
)}
</h4>
<p className={styles.sessionDate}>
Created {formatDate(session.created_at)}
</p>
</div>
<div className={styles.arrow}>→</div>
</Link>
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import {
SSE_EVENTS,
} from '@liam-hq/agent/client'
import { err, ok, type Result } from 'neverthrow'
import { useCallback, useMemo, useRef, useState } from 'react'
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { object, safeParse, string } from 'valibot'
import { useStreamingContext } from '../../../../contexts/StreamingContext'
import { useNavigationGuard } from '../../../../hooks/useNavigationGuard'
import { ERROR_MESSAGES } from '../../components/Chat/constants/chatConstants'
import {
Expand Down Expand Up @@ -72,71 +73,128 @@ export const useStream = ({
}: Props) => {
const messageManagerRef = useRef(new MessageTupleManager())
const storedMessage = useSessionStorageOnce(designSessionId)
const { getSession, updateSession, createSession } = useStreamingContext()

const processedInitialMessages = useMemo(() => {
if (storedMessage) {
return [storedMessage, ...initialMessages]
}
return initialMessages
}, [storedMessage, initialMessages])

const globalSession = getSession(designSessionId)

const [messages, setMessages] = useState<BaseMessage[]>(
processedInitialMessages,
globalSession?.messages ?? processedInitialMessages,
)
const [analyzedRequirements, setAnalyzedRequirements] =
useState<AnalyzedRequirements | null>(initialAnalyzedRequirements ?? null)

const [isStreaming, setIsStreaming] = useState(false)
const [error, setError] = useState<string | null>(null)
const abortRef = useRef<AbortController | null>(null)
useState<AnalyzedRequirements | null>(
globalSession?.analyzedRequirements ??
initialAnalyzedRequirements ??
null,
)

const [isStreaming, setIsStreaming] = useState(
globalSession?.isStreaming ?? false,
)
const [error, setError] = useState<string | null>(
globalSession?.error ?? null,
)
const abortRef = useRef<AbortController | null>(
globalSession?.abortController ?? null,
)
const retryCountRef = useRef(0)

const completeWorkflow = useCallback((sessionId: string) => {
setIsStreaming(false)
abortRef.current = null
retryCountRef.current = 0
clearWorkflowInProgress(sessionId)
}, [])
useEffect(() => {
if (!globalSession) {
createSession(
designSessionId,
processedInitialMessages,
initialAnalyzedRequirements ?? null,
)
}
}, [
designSessionId,
globalSession,
createSession,
processedInitialMessages,
initialAnalyzedRequirements,
])

useEffect(() => {
if (globalSession) {
setMessages(globalSession.messages)
setAnalyzedRequirements(globalSession.analyzedRequirements)
setIsStreaming(globalSession.isStreaming)
setError(globalSession.error)
abortRef.current = globalSession.abortController
}
}, [globalSession])

const completeWorkflow = useCallback(
(sessionId: string) => {
setIsStreaming(false)
abortRef.current = null
retryCountRef.current = 0
clearWorkflowInProgress(sessionId)
updateSession(sessionId, {
isStreaming: false,
abortController: null,
})
},
[updateSession],
)

const abortWorkflow = useCallback(() => {
abortRef.current?.abort()
setIsStreaming(false)
abortRef.current = null
retryCountRef.current = 0
// Do NOT clear workflow flag - allow reconnection
}, [])
updateSession(designSessionId, {
isStreaming: false,
abortController: null,
})
}, [designSessionId, updateSession])

const clearError = useCallback(() => {
setError(null)
}, [])
updateSession(designSessionId, {
error: null,
})
}, [designSessionId, updateSession])

useNavigationGuard((_event) => {
if (isStreaming) {
abortWorkflow()
}
return true
})

const handleMessageEvent = useCallback(async (ev: { data: string }) => {
const messageId = await messageManagerRef.current.add(ev.data)
if (!messageId) return
const handleMessageEvent = useCallback(
async (ev: { data: string }) => {
const messageId = await messageManagerRef.current.add(ev.data)
if (!messageId) return

setMessages((prev) => {
const newMessages = [...prev]
const result = messageManagerRef.current.get(messageId, prev.length)
if (!result?.chunk) return newMessages
setMessages((prev) => {
const newMessages = [...prev]
const result = messageManagerRef.current.get(messageId, prev.length)
if (!result?.chunk) return newMessages

const { chunk, index } = result
const message = coerceMessageLikeToMessage(chunk)
const { chunk, index } = result
const message = coerceMessageLikeToMessage(chunk)

if (index === undefined) {
newMessages.push(message)
} else {
newMessages[index] = message
}
if (index === undefined) {
newMessages.push(message)
} else {
newMessages[index] = message
}

return newMessages
})
}, [])
updateSession(designSessionId, {
messages: newMessages,
})

return newMessages
})
},
[designSessionId, updateSession],
)

const handleAnalyzedRequirementsEvent = useCallback(
(ev: { data: string }) => {
Expand All @@ -145,15 +203,26 @@ export const useStream = ({
const result = safeParse(analyzedRequirementsSchema, serialized)
if (result.success) {
setAnalyzedRequirements(result.output)
updateSession(designSessionId, {
analyzedRequirements: result.output,
})
}
},
[],
[designSessionId, updateSession],
)

const handleErrorEvent = useCallback((ev: { data: string }) => {
setIsStreaming(false)
setError(extractStreamErrorMessage(ev.data))
}, [])
const handleErrorEvent = useCallback(
(ev: { data: string }) => {
const errorMessage = extractStreamErrorMessage(ev.data)
setIsStreaming(false)
setError(errorMessage)
updateSession(designSessionId, {
isStreaming: false,
error: errorMessage,
})
},
[designSessionId, updateSession],
)

const handleStreamEvent = useCallback(
(ev: { event: string; data: string }): 'end' | 'error' | 'continue' => {
Expand Down Expand Up @@ -213,6 +282,12 @@ export const useStream = ({
setIsStreaming(true)
setError(null)

updateSession(params.designSessionId, {
isStreaming: true,
error: null,
abortController: controller,
})

try {
const res = await fetch(endpoint, {
method: 'POST',
Expand Down Expand Up @@ -267,7 +342,7 @@ export const useStream = ({
})
}
},
[completeWorkflow, abortWorkflow, processStreamEvents],
[completeWorkflow, abortWorkflow, processStreamEvents, updateSession],
)

const replay = useCallback(
Expand Down
Loading
Loading