Skip to main content

Streaming Event Handling

The pipeline processes multiple Server-Sent Event (SSE) types to provide real-time updates to the UI. This document details each event type and how it's handled.

Event Types Overview

Event Handlers Detail

1. product-metadata

Location: pipeline.ts:420-640

Purpose: Delivers comprehensive search and context information

Data Structure:

{
diagnostics?: {
predictedIntent?: string;
confidence?: number;
};
followUpSuggestions?: string[];
contextData?: {
occasion?: string;
recipient?: string;
ageGroup?: string;
budget?: { min?: number; max?: number };
language?: 'et' | 'en';
categoryHints?: string[];
productType?: string;
isPopular?: boolean;
};
searchParams?: SearchParams;
poolMetrics?: {
totalAvailable: number;
afterExclusions: number;
finalCandidates: number;
};
searchDurationMs?: number;
}

Actions:

  1. Parse and validate diagnostics
  2. Extract predicted intent for logging
  3. Process followup suggestions
  4. Update context data for context pills
  5. Store search params for "show more" functionality
  6. Capture pool metrics (product availability)
  7. Set smart suggestions in UI
  8. Update search duration metric

State Updates:

  • contextData ← extracted context
  • lastSearchParams ← search configuration
  • smartSuggestions ← suggestion buttons
  • searchDurationMs ← timing metric
  • poolMetrics ← availability info

2. smart-suggestions

Location: pipeline.ts:643-655

Purpose: Quick suggestions for user followup actions

Data Structure:

{
suggestions: string[];
}

Actions:

  1. Update suggestions state immediately
  2. Append to assistant message for persistence
  3. Trigger UI button render

State Updates:

  • smartSuggestions ← new suggestion array
  • Assistant message updated with suggestions

Example Suggestions:

  • "Show more products"
  • "Cheaper alternatives"
  • "Different category"
  • "Popular gifts"

3. tool-output

Location: pipeline.ts:658-674

Purpose: Deliver product cards to display

Data Structure:

{
toolName: string;
products: ProductCard[];
operation?: 'replace' | 'append';
}

Actions:

  1. Validate product array
  2. Replace or append to product list
  3. Mark tools as expected (enables skeleton)
  4. Sync UI skeleton state
  5. Update assistant message with products

State Updates:

  • productCards ← products array
  • toolsExpected ← true
  • showSkeleton ← false (products arrived)
  • Assistant message gets product attachments

Product Card Structure:

{
id: string;
name: string;
price: number;
imageUrl?: string;
category: string;
description?: string;
}

4. perf

Location: pipeline.ts:676-713

Purpose: Backend pipeline performance metrics

Data Structure:

{
contextExtractionMs?: number;
multiSearchMs?: number;
funnelMs?: number;
rerankMs?: number;
diversityMs?: number;
ttfcHintMs?: number;
}

Actions:

  1. Record each timing metric
  2. Update metric pill displays
  3. Calculate derived metrics
  4. Log performance data

State Updates:

  • contextExtractionTimeMs ← context timing
  • searchDurationMs ← search timing
  • rerankTimeMs ← rerank timing
  • diversityTimeMs ← diversity timing
  • ttfcMs ← TTFC hint (if provided)

Display: Metric pills show these timings to developers

5. text

Location: pipeline.ts:715-790

Purpose: Stream assistant response text in real-time

Data Structure:

{
content: string; // Incremental text chunk
}

Actions:

  1. Append to accumulated text buffer
  2. Capture first event time (TTFC calculation)
  3. Stream to UI without batching
  4. Check skeleton thresholds:
    • 160 characters accumulated
    • OR 800ms elapsed since first text
  5. Trigger product skeleton display if threshold met
  6. Update assistant message incrementally

State Updates:

  • accumulatedText ← buffer grows
  • firstEventTime ← timestamp (once)
  • ttfcMs ← calculated on first event
  • showSkeleton ← true (if thresholds met)
  • Assistant message updated with new text

Configuration:

MESSAGE_STREAMING_CONFIG = {
SKELETON_CHAR_THRESHOLD: 160,
SKELETON_TIME_THRESHOLD_MS: 800
}

6. done / EOF

Location: pipeline.ts:793-889

Purpose: Signal end of stream and finalize

Actions:

  1. Process any leftover text buffer
  2. Stamp streamingComplete: true
  3. Clear all timeouts
  4. Calculate final metrics
  5. Return aggregated result

State Updates:

  • streamingComplete ← true
  • isLoading ← false
  • All timers stopped
  • Assistant message marked complete

Return Value:

{
content: string;
products: ProductCard[];
contextData: ContextData | null;
smartSuggestions: string[];
metrics: {
ttfcMs: number;
searchDurationMs: number;
// ... other timings
};
diagnostics: object;
}

Event Parsing Flow

Error Handling

Each event handler includes error guards:

try {
const data = JSON.parse(event.data);
// ... process event
} catch (error) {
console.error('Event parse error:', {
event: event.type,
error: error.message
});
// Continue processing other events
}

Resilience Strategy:

  • Malformed events are logged but don't crash the stream
  • Missing fields use safe defaults
  • State updates are atomic (all-or-nothing)
  • UI stays responsive even with partial data

Event Sequence Example

Real conversation flow:

t=0ms    → Request sent
t=800ms → [perf] context extraction: 900ms
t=850ms → [product-metadata] contextData + searchParams
t=900ms → [text] "Siin on head kingitused"
t=950ms → [text] " sinu sõbrale..."
t=1000ms → [text] "..." (160 chars → skeleton!)
t=1200ms → [perf] search: 200ms, rerank: 150ms
t=1400ms → [tool-output] 3 products
t=1450ms → [smart-suggestions] ["Show more", "Cheaper"]
t=2000ms → [text] "...soovitan neid."
t=2100ms → [done]

Adding New Event Types

To extend with new events:

  1. Add to parser (parser.ts):

    export type SSEEventType = 
    | 'text'
    | 'product-metadata'
    | 'your-new-event';
  2. Add handler (pipeline.ts):

    case 'your-new-event':
    const data = JSON.parse(event.data);
    // Process and update state
    break;
  3. Update types (types.ts):

    interface YourNewEventData {
    // Define structure
    }
  4. Wire state setters in useMessageStreaming:

    const [yourState, setYourState] = useState<T>();