Skip to main content

Pipeline Extension Guide

This guide explains how to safely add new features to the message streaming pipeline while maintaining reliability and performance.

Adding New SSE Event Types

Step 1: Define Event Type

File: app/chat/hooks/useMessageStreaming/parser.ts

// Add to the union type
export type SSEEventType =
| 'text'
| 'product-metadata'
| 'smart-suggestions'
| 'tool-output'
| 'perf'
| 'done'
| 'your-new-event'; // ← Add here

// Define data structure
export interface YourNewEventData {
field1: string;
field2: number;
optional?: boolean;
}

Step 2: Add Event Handler

File: app/chat/hooks/useMessageStreaming/pipeline.ts

// In handleStreamingEvent function
case 'your-new-event':
try {
const data: YourNewEventData = JSON.parse(event.data);

// Validate required fields
if (!data.field1 || typeof data.field2 !== 'number') {
console.warn('Invalid your-new-event data:', data);
break;
}

// Update state
setYourState(data);

// Update message if needed
onNewMessage({
...currentMessage,
yourField: data
});

} catch (error) {
console.error('your-new-event parse error:', error);
}
break;

Step 3: Wire State Management

File: app/chat/hooks/useMessageStreaming/index.ts

export function useMessageStreaming() {
// Add state
const [yourState, setYourState] = useState<YourNewEventData | null>(null);

// Pass setter to pipeline
const result = await streamResponse({
// ... existing params
setYourState // ← Add setter
});

// Return from hook
return {
// ... existing returns
yourState // ← Expose state
};
}

Step 4: Update UI Component

File: Your component

function ChatInterface() {
const { yourState } = useMessageStreaming();

return (
<>
{yourState && (
<YourNewComponent data={yourState} />
)}
</>
);
}

Adding New Metrics

Step 1: Define Metric Type

// Add to metrics interface
interface PipelineMetrics {
// ... existing metrics
yourMetricMs?: number;
}

Step 2: Capture Timing

// In appropriate pipeline stage
const startTime = Date.now();

// ... your operation

const yourMetricMs = Date.now() - startTime;
setYourMetricMs(yourMetricMs);

Step 3: Include in Finalization

// In finalizeAssistantMessage
const finalMetrics = {
// ... existing metrics
yourMetric: yourMetricMs
};

finalMessage.metrics = finalMetrics;

Step 4: Display in UI

// Metric pill component
{metrics.yourMetric && (
<MetricPill
label="Your Metric"
value={`${metrics.yourMetric}ms`}
color="blue"
/>
)}

Adding New Context Data

Step 1: Extend Context Type

interface ContextData {
// ... existing fields
yourNewField?: string;
yourNewArray?: string[];
}

Step 2: Extract from Metadata

// In product-metadata handler
const contextData: ContextData = {
// ... existing fields
yourNewField: metadata.yourNewField,
yourNewArray: metadata.yourNewArray || []
};

setContextData(contextData);

Step 3: Preserve Through Lifecycle

// Ensure it flows through:
// 1. Metadata event → setContextData
// 2. Streaming updates → include in message
// 3. Finalization → attach to final message
// 4. Persistence → save with message

Step 4: Display Context Pill

{contextData.yourNewField && (
<ContextPill
label="Your Field"
value={contextData.yourNewField}
icon={<YourIcon />}
/>
)}

Adding New Smart Suggestions

Step 1: Define Suggestion Type

type SmartSuggestion = {
text: string;
action: 'show-more' | 'cheaper' | 'your-new-action';
metadata?: Record<string, any>;
};

Step 2: Generate on Backend

// In response orchestrator
const suggestions: SmartSuggestion[] = [
{
text: 'Try your new action',
action: 'your-new-action',
metadata: { param: 'value' }
}
];

// Emit in SSE
yield {
event: 'smart-suggestions',
data: JSON.stringify({ suggestions })
};

Step 3: Handle Click

function handleSuggestionClick(suggestion: SmartSuggestion) {
if (suggestion.action === 'your-new-action') {
// Your custom logic
performYourAction(suggestion.metadata);
}
}

Adding New Product Fields

Step 1: Extend Product Type

interface ProductCard {
// ... existing fields
yourNewField?: string;
yourNewMetadata?: {
key: string;
value: any;
};
}

Step 2: Include in Tool Output

// Backend emits extended products
yield {
event: 'tool-output',
data: JSON.stringify({
products: products.map(p => ({
...p,
yourNewField: calculateYourField(p),
yourNewMetadata: { /* ... */ }
}))
})
};

Step 3: Display in Product Card

function ProductCard({ product }: { product: ProductCard }) {
return (
<>
{/* ... existing fields */}
{product.yourNewField && (
<div className="your-field">
{product.yourNewField}
</div>
)}
</>
);
}

Safety Checklist

When adding new features, ensure:

Error Handling

  • Try-catch around event parsing
  • Validation of required fields
  • Graceful degradation if data missing
  • Logging for debugging

Performance

  • No blocking operations in event handlers
  • State updates are atomic
  • No unnecessary re-renders
  • Metrics captured for new operations

Timing Preservation

  • First-event timing not affected
  • TTFC calculation remains accurate
  • Timeout logic still works
  • Cleanup happens properly

Context Preservation

  • Context data flows through lifecycle
  • "Show more" functionality unaffected
  • Conversation history maintains integrity
  • Persistence includes new data

Backwards Compatibility

  • Old clients can handle new events
  • Optional fields use safe defaults
  • No breaking changes to existing events
  • Versioning considered if needed

Testing New Features

Unit Tests

describe('YourNewEvent', () => {
it('parses valid data', () => {
const event = {
type: 'your-new-event',
data: JSON.stringify({ field1: 'test', field2: 42 })
};

const result = parseEvent(event);

expect(result.field1).toBe('test');
expect(result.field2).toBe(42);
});

it('handles missing fields gracefully', () => {
const event = {
type: 'your-new-event',
data: JSON.stringify({ field1: 'test' }) // missing field2
};

expect(() => parseEvent(event)).not.toThrow();
});
});

Integration Tests

describe('Pipeline with YourNewEvent', () => {
it('processes event and updates UI', async () => {
const mockEvents = [
{ type: 'text', data: '"Hello"' },
{ type: 'your-new-event', data: '{"field1":"test","field2":42}' },
{ type: 'done', data: '{}' }
];

const { result } = await streamMessage(input, mockEvents);

expect(result.yourState).toEqual({ field1: 'test', field2: 42 });
});
});

E2E Tests

describe('User flow with new feature', () => {
it('displays new UI element when event arrives', async () => {
await userTypes('show products');

await waitFor(() => {
expect(screen.getByTestId('your-new-component')).toBeInTheDocument();
});

expect(screen.getByText('test')).toBeInTheDocument();
});
});

Common Pitfalls

Don't: Modify Existing Event Handlers

// BAD: Changes behavior of existing events
case 'text':
// Adding unrelated logic here
doSomethingUnrelated();
break;

Do: Create New Event Types

// GOOD: New event for new functionality
case 'your-new-event':
handleYourNewFeature(data);
break;

Don't: Block Event Loop

// BAD: Synchronous heavy operation
case 'your-new-event':
const result = expensiveSync Calculation();
break;

Do: Keep Handlers Fast

// GOOD: Async or deferred
case 'your-new-event':
Promise.resolve().then(() => expensiveCalculation());
break;

Don't: Skip Validation

// BAD: Assumes data is valid
const { requiredField } = JSON.parse(event.data);
doSomething(requiredField);

Do: Validate Everything

// GOOD: Check before using
const data = JSON.parse(event.data);
if (isValid(data.requiredField)) {
doSomething(data.requiredField);
}

Getting Help

Debug Logging

Enable verbose logging:

const DEBUG = true;

if (DEBUG) {
console.log('[Pipeline] Event:', event.type, event.data);
console.log('[Pipeline] State before:', currentState);
// ... process event
console.log('[Pipeline] State after:', newState);
}

Performance Profiling

const marks = new Map<string, number>();

marks.set('start', performance.now());
// ... your code
marks.set('end', performance.now());

console.log('Duration:', marks.get('end')! - marks.get('start')!);

State Inspection

Use React DevTools to inspect:

  • Component state
  • Hook values
  • Re-render counts
  • Props flow