Streaming Event Handling
The pipeline processes multiple Server-Sent Event (SSE) types to provide real-time updates to the UI. This document details each event type and how it's handled.
Event Types Overview
Event Handlers Detail
1. product-metadata
Location: pipeline.ts:420-640
Purpose: Delivers comprehensive search and context information
Data Structure:
{
diagnostics?: {
predictedIntent?: string;
confidence?: number;
};
followUpSuggestions?: string[];
contextData?: {
occasion?: string;
recipient?: string;
ageGroup?: string;
budget?: { min?: number; max?: number };
language?: 'et' | 'en';
categoryHints?: string[];
productType?: string;
isPopular?: boolean;
};
searchParams?: SearchParams;
poolMetrics?: {
totalAvailable: number;
afterExclusions: number;
finalCandidates: number;
};
searchDurationMs?: number;
}
Actions:
- Parse and validate diagnostics
- Extract predicted intent for logging
- Process followup suggestions
- Update context data for context pills
- Store search params for "show more" functionality
- Capture pool metrics (product availability)
- Set smart suggestions in UI
- Update search duration metric
State Updates:
contextData← extracted contextlastSearchParams← search configurationsmartSuggestions← suggestion buttonssearchDurationMs← timing metricpoolMetrics← availability info
2. smart-suggestions
Location: pipeline.ts:643-655
Purpose: Quick suggestions for user followup actions
Data Structure:
{
suggestions: string[];
}
Actions:
- Update suggestions state immediately
- Append to assistant message for persistence
- Trigger UI button render
State Updates:
smartSuggestions← new suggestion array- Assistant message updated with suggestions
Example Suggestions:
- "Show more products"
- "Cheaper alternatives"
- "Different category"
- "Popular gifts"
3. tool-output
Location: pipeline.ts:658-674
Purpose: Deliver product cards to display
Data Structure:
{
toolName: string;
products: ProductCard[];
operation?: 'replace' | 'append';
}
Actions:
- Validate product array
- Replace or append to product list
- Mark tools as expected (enables skeleton)
- Sync UI skeleton state
- Update assistant message with products
State Updates:
productCards← products arraytoolsExpected← trueshowSkeleton← false (products arrived)- Assistant message gets product attachments
Product Card Structure:
{
id: string;
name: string;
price: number;
imageUrl?: string;
category: string;
description?: string;
}
4. perf
Location: pipeline.ts:676-713
Purpose: Backend pipeline performance metrics
Data Structure:
{
contextExtractionMs?: number;
multiSearchMs?: number;
funnelMs?: number;
rerankMs?: number;
diversityMs?: number;
ttfcHintMs?: number;
}
Actions:
- Record each timing metric
- Update metric pill displays
- Calculate derived metrics
- Log performance data
State Updates:
contextExtractionTimeMs← context timingsearchDurationMs← search timingrerankTimeMs← rerank timingdiversityTimeMs← diversity timingttfcMs← TTFC hint (if provided)
Display: Metric pills show these timings to developers
5. text
Location: pipeline.ts:715-790
Purpose: Stream assistant response text in real-time
Data Structure:
{
content: string; // Incremental text chunk
}
Actions:
- Append to accumulated text buffer
- Capture first event time (TTFC calculation)
- Stream to UI without batching
- Check skeleton thresholds:
- 160 characters accumulated
- OR 800ms elapsed since first text
- Trigger product skeleton display if threshold met
- Update assistant message incrementally
State Updates:
accumulatedText← buffer growsfirstEventTime← timestamp (once)ttfcMs← calculated on first eventshowSkeleton← true (if thresholds met)- Assistant message updated with new text
Configuration:
MESSAGE_STREAMING_CONFIG = {
SKELETON_CHAR_THRESHOLD: 160,
SKELETON_TIME_THRESHOLD_MS: 800
}
6. done / EOF
Location: pipeline.ts:793-889
Purpose: Signal end of stream and finalize
Actions:
- Process any leftover text buffer
- Stamp
streamingComplete: true - Clear all timeouts
- Calculate final metrics
- Return aggregated result
State Updates:
streamingComplete← trueisLoading← false- All timers stopped
- Assistant message marked complete
Return Value:
{
content: string;
products: ProductCard[];
contextData: ContextData | null;
smartSuggestions: string[];
metrics: {
ttfcMs: number;
searchDurationMs: number;
// ... other timings
};
diagnostics: object;
}
Event Parsing Flow
Error Handling
Each event handler includes error guards:
try {
const data = JSON.parse(event.data);
// ... process event
} catch (error) {
console.error('Event parse error:', {
event: event.type,
error: error.message
});
// Continue processing other events
}
Resilience Strategy:
- Malformed events are logged but don't crash the stream
- Missing fields use safe defaults
- State updates are atomic (all-or-nothing)
- UI stays responsive even with partial data
Event Sequence Example
Real conversation flow:
t=0ms → Request sent
t=800ms → [perf] context extraction: 900ms
t=850ms → [product-metadata] contextData + searchParams
t=900ms → [text] "Siin on head kingitused"
t=950ms → [text] " sinu sõbrale..."
t=1000ms → [text] "..." (160 chars → skeleton!)
t=1200ms → [perf] search: 200ms, rerank: 150ms
t=1400ms → [tool-output] 3 products
t=1450ms → [smart-suggestions] ["Show more", "Cheaper"]
t=2000ms → [text] "...soovitan neid."
t=2100ms → [done]
Adding New Event Types
To extend with new events:
-
Add to parser (
parser.ts):export type SSEEventType =
| 'text'
| 'product-metadata'
| 'your-new-event'; -
Add handler (
pipeline.ts):case 'your-new-event':
const data = JSON.parse(event.data);
// Process and update state
break; -
Update types (
types.ts):interface YourNewEventData {
// Define structure
} -
Wire state setters in
useMessageStreaming:const [yourState, setYourState] = useState<T>();
Related Documentation
- Lifecycle & Flow - When events arrive
- Resilience - Timeout and error handling