Pipeline Extension Guide
This guide explains how to safely add new features to the message streaming pipeline while maintaining reliability and performance.
Adding New SSE Event Types
Step 1: Define Event Type
File: app/chat/hooks/useMessageStreaming/parser.ts
// Add to the union type
export type SSEEventType =
| 'text'
| 'product-metadata'
| 'smart-suggestions'
| 'tool-output'
| 'perf'
| 'done'
| 'your-new-event'; // ← Add here
// Define data structure
export interface YourNewEventData {
field1: string;
field2: number;
optional?: boolean;
}
Step 2: Add Event Handler
File: app/chat/hooks/useMessageStreaming/pipeline.ts
// In handleStreamingEvent function
case 'your-new-event':
try {
const data: YourNewEventData = JSON.parse(event.data);
// Validate required fields
if (!data.field1 || typeof data.field2 !== 'number') {
console.warn('Invalid your-new-event data:', data);
break;
}
// Update state
setYourState(data);
// Update message if needed
onNewMessage({
...currentMessage,
yourField: data
});
} catch (error) {
console.error('your-new-event parse error:', error);
}
break;
Step 3: Wire State Management
File: app/chat/hooks/useMessageStreaming/index.ts
export function useMessageStreaming() {
// Add state
const [yourState, setYourState] = useState<YourNewEventData | null>(null);
// Pass setter to pipeline
const result = await streamResponse({
// ... existing params
setYourState // ← Add setter
});
// Return from hook
return {
// ... existing returns
yourState // ← Expose state
};
}
Step 4: Update UI Component
File: Your component
function ChatInterface() {
const { yourState } = useMessageStreaming();
return (
<>
{yourState && (
<YourNewComponent data={yourState} />
)}
</>
);
}
Adding New Metrics
Step 1: Define Metric Type
// Add to metrics interface
interface PipelineMetrics {
// ... existing metrics
yourMetricMs?: number;
}
Step 2: Capture Timing
// In appropriate pipeline stage
const startTime = Date.now();
// ... your operation
const yourMetricMs = Date.now() - startTime;
setYourMetricMs(yourMetricMs);
Step 3: Include in Finalization
// In finalizeAssistantMessage
const finalMetrics = {
// ... existing metrics
yourMetric: yourMetricMs
};
finalMessage.metrics = finalMetrics;
Step 4: Display in UI
// Metric pill component
{metrics.yourMetric && (
<MetricPill
label="Your Metric"
value={`${metrics.yourMetric}ms`}
color="blue"
/>
)}
Adding New Context Data
Step 1: Extend Context Type
interface ContextData {
// ... existing fields
yourNewField?: string;
yourNewArray?: string[];
}
Step 2: Extract from Metadata
// In product-metadata handler
const contextData: ContextData = {
// ... existing fields
yourNewField: metadata.yourNewField,
yourNewArray: metadata.yourNewArray || []
};
setContextData(contextData);
Step 3: Preserve Through Lifecycle
// Ensure it flows through:
// 1. Metadata event → setContextData
// 2. Streaming updates → include in message
// 3. Finalization → attach to final message
// 4. Persistence → save with message
Step 4: Display Context Pill
{contextData.yourNewField && (
<ContextPill
label="Your Field"
value={contextData.yourNewField}
icon={<YourIcon />}
/>
)}
Adding New Smart Suggestions
Step 1: Define Suggestion Type
type SmartSuggestion = {
text: string;
action: 'show-more' | 'cheaper' | 'your-new-action';
metadata?: Record<string, any>;
};
Step 2: Generate on Backend
// In response orchestrator
const suggestions: SmartSuggestion[] = [
{
text: 'Try your new action',
action: 'your-new-action',
metadata: { param: 'value' }
}
];
// Emit in SSE
yield {
event: 'smart-suggestions',
data: JSON.stringify({ suggestions })
};
Step 3: Handle Click
function handleSuggestionClick(suggestion: SmartSuggestion) {
if (suggestion.action === 'your-new-action') {
// Your custom logic
performYourAction(suggestion.metadata);
}
}
Adding New Product Fields
Step 1: Extend Product Type
interface ProductCard {
// ... existing fields
yourNewField?: string;
yourNewMetadata?: {
key: string;
value: any;
};
}
Step 2: Include in Tool Output
// Backend emits extended products
yield {
event: 'tool-output',
data: JSON.stringify({
products: products.map(p => ({
...p,
yourNewField: calculateYourField(p),
yourNewMetadata: { /* ... */ }
}))
})
};
Step 3: Display in Product Card
function ProductCard({ product }: { product: ProductCard }) {
return (
<>
{/* ... existing fields */}
{product.yourNewField && (
<div className="your-field">
{product.yourNewField}
</div>
)}
</>
);
}
Safety Checklist
When adding new features, ensure:
Error Handling
- Try-catch around event parsing
- Validation of required fields
- Graceful degradation if data missing
- Logging for debugging
Performance
- No blocking operations in event handlers
- State updates are atomic
- No unnecessary re-renders
- Metrics captured for new operations
Timing Preservation
- First-event timing not affected
- TTFC calculation remains accurate
- Timeout logic still works
- Cleanup happens properly
Context Preservation
- Context data flows through lifecycle
- "Show more" functionality unaffected
- Conversation history maintains integrity
- Persistence includes new data
Backwards Compatibility
- Old clients can handle new events
- Optional fields use safe defaults
- No breaking changes to existing events
- Versioning considered if needed
Testing New Features
Unit Tests
describe('YourNewEvent', () => {
it('parses valid data', () => {
const event = {
type: 'your-new-event',
data: JSON.stringify({ field1: 'test', field2: 42 })
};
const result = parseEvent(event);
expect(result.field1).toBe('test');
expect(result.field2).toBe(42);
});
it('handles missing fields gracefully', () => {
const event = {
type: 'your-new-event',
data: JSON.stringify({ field1: 'test' }) // missing field2
};
expect(() => parseEvent(event)).not.toThrow();
});
});
Integration Tests
describe('Pipeline with YourNewEvent', () => {
it('processes event and updates UI', async () => {
const mockEvents = [
{ type: 'text', data: '"Hello"' },
{ type: 'your-new-event', data: '{"field1":"test","field2":42}' },
{ type: 'done', data: '{}' }
];
const { result } = await streamMessage(input, mockEvents);
expect(result.yourState).toEqual({ field1: 'test', field2: 42 });
});
});
E2E Tests
describe('User flow with new feature', () => {
it('displays new UI element when event arrives', async () => {
await userTypes('show products');
await waitFor(() => {
expect(screen.getByTestId('your-new-component')).toBeInTheDocument();
});
expect(screen.getByText('test')).toBeInTheDocument();
});
});
Common Pitfalls
Don't: Modify Existing Event Handlers
// BAD: Changes behavior of existing events
case 'text':
// Adding unrelated logic here
doSomethingUnrelated();
break;
Do: Create New Event Types
// GOOD: New event for new functionality
case 'your-new-event':
handleYourNewFeature(data);
break;
Don't: Block Event Loop
// BAD: Synchronous heavy operation
case 'your-new-event':
const result = expensiveSync Calculation();
break;
Do: Keep Handlers Fast
// GOOD: Async or deferred
case 'your-new-event':
Promise.resolve().then(() => expensiveCalculation());
break;
Don't: Skip Validation
// BAD: Assumes data is valid
const { requiredField } = JSON.parse(event.data);
doSomething(requiredField);
Do: Validate Everything
// GOOD: Check before using
const data = JSON.parse(event.data);
if (isValid(data.requiredField)) {
doSomething(data.requiredField);
}
Getting Help
Debug Logging
Enable verbose logging:
const DEBUG = true;
if (DEBUG) {
console.log('[Pipeline] Event:', event.type, event.data);
console.log('[Pipeline] State before:', currentState);
// ... process event
console.log('[Pipeline] State after:', newState);
}
Performance Profiling
const marks = new Map<string, number>();
marks.set('start', performance.now());
// ... your code
marks.set('end', performance.now());
console.log('Duration:', marks.get('end')! - marks.get('start')!);
State Inspection
Use React DevTools to inspect:
- Component state
- Hook values
- Re-render counts
- Props flow
Related Documentation
- Overview - Pipeline architecture
- Event Handling - Existing event types
- Resilience - Error handling patterns