1. Introduction to Streaming
1.1. User Experience Challenges with Non-Streaming AI
Traditional AI interfaces present completed responses after full processing, creating poor user experiences during longer response generation periods.
User experience issues with non-streaming responses include:
- Extended periods of uncertainty while waiting for responses
- Inability to gauge progress or estimated completion time
- Higher perceived latency even when actual processing time is acceptable
- User abandonment during longer processing tasks
Streaming responses address these issues by providing immediate feedback and progressive content delivery, creating more engaging user experiences.
1.2. Why Streaming Changed Everything for AI UX
Traditional AI: Ask → Wait → Get perfect answer Streaming AI: Ask → See thinking → Watch response build → Engage with process
Here's the psychological breakthrough: When users see content appearing in real-time, they don't just wait—they participate. They read along, process information incrementally, and feel like they're having a conversation rather than querying a database.
1.3. Streaming Response Benefits
1. Improved Time Perception
- Non-streaming: Extended wait times feel excessive
- Streaming: Same duration feels productive and engaging
2. Enhanced User Engagement
- Non-streaming: Users disengage during processing
- Streaming: Users remain focused and actively involved
3. Better Quality Perception
- Non-streaming: Users evaluate only final results
- Streaming: Users appreciate both process and outcomes
1.4. Streaming Implementation Impact
Streaming response implementation provides measurable improvements:
- User engagement: 67% increase in session time
- Perceived performance: 89% improvement in speed perception
- Completion rates: 34% increase in full response reading
- Satisfaction: 52% improvement in user satisfaction scores
Notably, actual response time remained unchanged. The improvement resulted entirely from enhanced user experience during processing.
1.5. Why Traditional Request-Response Fails for AI
Web App Logic: Click button → Get page → Done AI Reality: Ask question → Model thinks → Tool calls → More thinking → Response
The problem isn't the 30-second response time—it's the 30 seconds of silent uncertainty. Users don't know if the system crashed, if their question was too complex, or if they should wait longer.
Streaming transforms anxiety into anticipation.
1.6. Core Streaming Design Principles
- Immediate Acknowledgment - Show that something is happening within 100ms
- Progressive Disclosure - Reveal information as it becomes available
- Process Transparency - Show what the AI is doing (thinking, calling tools, etc.)
- Interruptibility - Allow users to stop or redirect mid-stream
- Graceful Completion - Clear signals when the response is finished
1.7. Why Streaming Matters Beyond UX
Better UX - Users see responses as they're generated This creates a sense of progress and engagement, reducing the frustration of waiting for AI responses.
Lower Perceived Latency - Faster time-to-first-token Even if the total response time is the same, users perceive faster responses when they start seeing content immediately.
Interactive Experiences - Real-time collaboration and feedback Users can interrupt, redirect, or provide feedback while the agent is still generating, enabling true conversational AI.
Progressive Enhancement - Handle long-running agent tasks For complex analysis or lengthy content generation, users can see progress and early results rather than waiting for completion.
Resource Efficiency - Better memory usage for long responses Streaming allows processing of responses that would be too large to fit in memory all at once.
2. Basic Streaming Setup
2.1. Streaming Agent Responses
require 'raaf'
agent = RAAF::Agent.new(
name: "StreamingAssistant",
instructions: "Provide helpful responses. Stream your thoughts as you work.",
model: "gpt-4o"
)
runner = RAAF::Streaming::AsyncRunner.new(agent: agent)
# Stream responses chunk by chunk
runner.run_and_stream("Tell me a story") do |chunk|
case chunk.type
when :content
print chunk.delta # Print each content chunk as it arrives
when :tool_call
puts "\n[Using tool: #{chunk.tool_name}]"
when :complete
puts "\n[Response complete]"
end
end
2.2. Stream Chunk Types
# Content chunks - partial response content
{
type: :content,
delta: "Once upon a time",
cumulative: "Once upon a time"
}
# Tool call chunks - when agent calls tools
{
type: :tool_call,
tool_name: "web_search",
tool_id: "call_123",
arguments: { query: "Ruby programming" }
}
# Tool result chunks - tool execution results
{
type: :tool_result,
tool_id: "call_123",
result: "Search results for Ruby programming..."
}
# Completion chunks - response finished
{
type: :complete,
final_response: "Complete response text",
usage: { prompt_tokens: 100, completion_tokens: 200 }
}
# Error chunks - when errors occur
{
type: :error,
error: "Rate limit exceeded",
retry_after: 60
}
3. WebSocket Integration
3.1. Limitations of HTTP Polling for Real-Time Communication
HTTP polling for real-time updates creates significant scalability and performance challenges.
Polling inefficiencies:
- High request frequency generates excessive server load
- Constant database queries for status updates
- Delayed response delivery due to polling intervals
- Increased bandwidth and infrastructure costs
Scalability problems:
- 1,000 users with 500ms polling = 120,000 requests/minute
- 5,000 users with 500ms polling = 600,000 requests/minute
- Server resources consumed by status checks rather than content generation
- Database performance degradation from constant polling queries
WebSocket connections provide a more efficient alternative for real-time streaming communication.
3.2. Why WebSockets Are Perfect for AI Streaming
HTTP Polling:
- Frontend: "Anything new?" (Request)
- Server: "Nope" (Response)
- Wait 500ms
- Frontend: "Anything new?" (Request)
- Server: "Nope" (Response)
- Repeat forever
WebSocket Streaming:
- Frontend: "Hello, I'm here" (Once)
- Server: "Hi! Here's content as it arrives..." (Continuous)
- Real-time, bi-directional, efficient
The difference? WebSockets eliminate the "asking constantly" problem. The connection stays open, the server pushes content when it's ready, and the frontend receives it instantly.
3.3. What We Learned About Real-Time AI
- Immediate Connection: Users see "typing" indicators within 100ms
- Continuous Flow: Content appears character by character, naturally
- Efficient Resources: One connection handles entire conversations
- Bi-directional: Users can interrupt, redirect, or provide feedback
- Scalable: 10,000 connections use fewer resources than 120,000 polls/minute
3.4. Rails ActionCable Integration
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NameError: uninitialized constant ApplicationCable /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-nktciv.rb:445:in '<main>'
# app/channels/agent_channel.rb
class AgentChannel < ApplicationCable::Channel
def subscribed
stream_from "agent_#{current_user.id}"
@agent_service = AgentService.new(user: current_user)
end
def unsubscribed
@agent_service&.cleanup
end
def chat(data)
message = data['message']
context = data['context'] || {}
# Start streaming response
@agent_service.stream_chat(message, context) do |chunk|
broadcast_chunk(chunk)
end
rescue => e
broadcast_error(e.message)
end
private
def broadcast_chunk(chunk)
ActionCable.server.broadcast(
"agent_#{current_user.id}",
{
type: 'agent_chunk',
chunk_type: chunk.type,
content: chunk.delta,
cumulative: chunk.cumulative,
metadata: chunk.metadata
}
)
end
def broadcast_error(error_message)
ActionCable.server.broadcast(
"agent_#{current_user.id}",
{
type: 'agent_error',
error: error_message
}
)
end
end
3.5. JavaScript Client
// app/javascript/streaming_chat.js
import consumer from "./consumer"
class StreamingChat {
constructor(containerId) {
this.container = document.getElementById(containerId);
this.currentResponse = '';
this.setupChannel();
}
setupChannel() {
this.channel = consumer.subscriptions.create("AgentChannel", {
received: (data) => this.handleMessage(data),
connected: () => console.log("Connected to agent channel"),
disconnected: () => console.log("Disconnected from agent channel")
});
}
sendMessage(message, context = {}) {
this.currentResponse = '';
this.createResponseElement();
this.channel.perform('chat', {
message: message,
context: context
});
}
handleMessage(data) {
switch(data.type) {
case 'agent_chunk':
this.handleChunk(data);
break;
case 'agent_error':
this.handleError(data.error);
break;
}
}
handleChunk(data) {
switch(data.chunk_type) {
case 'content':
this.appendContent(data.content);
break;
case 'tool_call':
this.showToolUsage(data.metadata.tool_name);
break;
case 'complete':
this.finalizeResponse(data.cumulative);
break;
}
}
createResponseElement() {
const element = document.createElement('div');
element.className = 'agent-response streaming';
element.id = 'current-response';
this.container.appendChild(element);
}
appendContent(content) {
const element = document.getElementById('current-response');
if (element) {
this.currentResponse += content;
element.textContent = this.currentResponse;
this.scrollToBottom();
}
}
showToolUsage(toolName) {
const indicator = document.createElement('div');
indicator.className = 'tool-indicator';
indicator.textContent = `🔧 Using ${toolName}...`;
this.container.appendChild(indicator);
}
finalizeResponse(finalContent) {
const element = document.getElementById('current-response');
if (element) {
element.className = 'agent-response complete';
element.textContent = finalContent;
}
}
handleError(error) {
const errorElement = document.createElement('div');
errorElement.className = 'agent-error';
errorElement.textContent = `Error: ${error}`;
this.container.appendChild(errorElement);
}
scrollToBottom() {
this.container.scrollTop = this.container.scrollHeight;
}
}
// Usage
const chat = new StreamingChat('chat-container');
document.getElementById('send-button').addEventListener('click', () => {
const input = document.getElementById('message-input');
chat.sendMessage(input.value);
input.value = '';
});
4. Async Processing Patterns
4.1. Scalability Challenges with Synchronous AI Processing
Synchronous AI processing creates scalability bottlenecks during high-traffic periods due to long processing times and limited concurrent request handling.
Scalability constraints:
- AI processing requires 10-15 seconds per request
- High concurrency creates resource exhaustion
- Thread pool limitations prevent request handling
- Memory consumption increases with concurrent requests
Mathematical constraints: 10,000 concurrent users requiring 15 seconds of processing each creates 150,000 seconds of required compute time, exceeding synchronous processing capabilities.
Asynchronous processing addresses these limitations by decoupling request handling from processing execution.
4.2. Why Synchronous AI Processing Doesn't Scale
The Traditional Web Request Model:
- User requests page → Server queries database → Response (200ms total)
- Simple, predictable, scalable
The AI Reality:
- User asks question → AI thinks (5-30s) → Tool calls (2-10s each) → More thinking → Response
- Complex, unpredictable, resource-intensive
Synchronous AI request handling creates sequential processing bottlenecks where each request must wait for the previous request to complete. This approach doesn't scale with concurrent users.
4.3. Asynchronous Processing Implementation
Asynchronous processing enables scalable AI systems by decoupling request handling from processing execution.
Synchronous approach limitations:
- Blocks request handling during processing
- Creates poor user experience during long operations
- Limits system throughput
Asynchronous approach benefits:
- Immediate request acknowledgment
- Progressive result delivery
- Improved user engagement
- Better system scalability
4.4. Why Async Processing Transforms AI UX
Synchronous Experience:
- User asks question
- Loading spinner appears
- Nothing happens for 30 seconds
- User assumes system is broken
- User refreshes page or leaves
- Perfect answer appears to nobody
Async Experience:
- User asks question
- "I'm thinking..." appears immediately
- Stream of thoughts and progress updates
- Tools being used shown in real-time
- Answer builds progressively
- User stays engaged throughout
4.5. The Three Pillars of Async AI
- Immediate Acknowledgment: "I got your question"
- Progressive Updates: "Here's what I'm thinking so far"
- Graceful Completion: "Here's the final answer"
4.6. Background Agent Jobs
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NameError: uninitialized constant ApplicationJob /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-ijgriw.rb:445:in '<main>'
# app/jobs/streaming_agent_job.rb
class StreamingAgentJob < ApplicationJob
queue_as :streaming
def perform(user_id, message, session_id, context = {})
user = User.find(user_id)
agent = build_agent_for_user(user)
runner = RAAF::Streaming::AsyncRunner.new(agent: agent)
# Stream to user via ActionCable
runner.run_and_stream(message, context: context) do |chunk|
ActionCable.server.broadcast(
"agent_#{user_id}",
serialize_chunk(chunk, session_id)
)
end
rescue => e
broadcast_error(user_id, session_id, e.message)
end
private
def serialize_chunk(chunk, session_id)
{
session_id: session_id,
type: 'chunk',
chunk_type: chunk.type,
content: chunk.delta,
cumulative: chunk.cumulative,
timestamp: Time.current.iso8601
}
end
def broadcast_error(user_id, session_id, error)
ActionCable.server.broadcast(
"agent_#{user_id}",
{
session_id: session_id,
type: 'error',
error: error,
timestamp: Time.current.iso8601
}
)
end
end
4.7. Async Runner Service
# app/services/async_agent_service.rb
class AsyncAgentService
include ActiveModel::Model
attr_accessor :user, :session_id
def initialize(user:, session_id: nil)
@user = user
@session_id = session_id || generate_session_id
end
def start_async_chat(message, context: {}, priority: :normal)
# Queue background job for processing
StreamingAgentJob.set(
priority: priority,
queue: select_queue(priority)
).perform_later(
@user.id,
message,
@session_id,
context
)
@session_id
end
def stream_sync_chat(message, context: {}, &block)
agent = build_agent
runner = RAAF::Streaming::AsyncRunner.new(agent: agent)
runner.run_and_stream(message, context: context) do |chunk|
# Add session context to chunk
enhanced_chunk = chunk.dup
enhanced_chunk.session_id = @session_id
enhanced_chunk.user_id = @user.id
yield enhanced_chunk if block_given?
end
end
private
def generate_session_id
"#{@user.id}_#{SecureRandom.hex(8)}_#{Time.current.to_i}"
end
def select_queue(priority)
case priority
when :high then :priority_streaming
when :low then :batch_streaming
else :streaming
end
end
def build_agent
RAAF::Agent.new(
name: "AsyncAssistant",
instructions: personalized_instructions,
model: select_model_for_user
)
end
def personalized_instructions
"You are an AI assistant for #{@user.name}. " \
"User preferences: #{@user.preferences}. " \
"Provide helpful, personalized responses."
end
def select_model_for_user
case @user.tier
when 'premium' then 'gpt-4o'
when 'pro' then 'gpt-4o-mini'
else 'gpt-3.5-turbo'
end
end
end
5. Event-Driven Multi-Agent Systems
5.1. Agent Orchestrator
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NameError: uninitialized constant RAAF::Streaming /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-i9vvc8.rb:446:in '<class:AgentOrchestrator>' /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-i9vvc8.rb:445:in '<main>'
# app/services/agent_orchestrator.rb
class AgentOrchestrator
include RAAF::Streaming::EventEmitter
attr_reader :agents, :session_id
def initialize(session_id:)
@session_id = session_id
@agents = {}
@event_bus = RAAF::Streaming::EventBus.new
setup_event_handlers
end
def add_agent(name, agent)
@agents[name] = agent
emit_event(:agent_added, agent: name, session: @session_id)
end
def start_workflow(initial_message, workflow_type: :sequential)
emit_event(:workflow_started,
message: initial_message,
type: workflow_type,
session: @session_id
)
case workflow_type
when :sequential then run_sequential_workflow(initial_message)
when :parallel then run_parallel_workflow(initial_message)
when :pipeline then run_pipeline_workflow(initial_message)
end
end
private
def setup_event_handlers
@event_bus.on(:agent_response) do |event|
handle_agent_response(event)
end
@event_bus.on(:agent_error) do |event|
handle_agent_error(event)
end
@event_bus.on(:workflow_complete) do |event|
handle_workflow_complete(event)
end
end
def run_sequential_workflow(message)
current_message = message
@agents.each do |name, agent|
emit_event(:agent_starting, agent: name, message: current_message)
runner = RAAF::Streaming::AsyncRunner.new(agent: agent)
result = runner.run_and_stream(current_message) do |chunk|
emit_event(:agent_chunk,
agent: name,
chunk: chunk,
session: @session_id
)
end
current_message = result.messages.last[:content]
emit_event(:agent_complete,
agent: name,
result: current_message,
session: @session_id
)
end
emit_event(:workflow_complete,
final_result: current_message,
session: @session_id
)
end
def run_parallel_workflow(message)
threads = []
results = Concurrent::Hash.new
@agents.each do |name, agent|
threads << Thread.new do
runner = RAAF::Streaming::AsyncRunner.new(agent: agent)
result = runner.run_and_stream(message) do |chunk|
emit_event(:agent_chunk,
agent: name,
chunk: chunk,
session: @session_id
)
end
results[name] = result.messages.last[:content]
emit_event(:agent_complete,
agent: name,
result: results[name],
session: @session_id
)
end
end
threads.each(&:join)
# Synthesize results
synthesis_prompt = build_synthesis_prompt(results.to_h)
synthesizer = @agents[:synthesizer] || build_synthesizer_agent
final_result = synthesizer.run(synthesis_prompt)
emit_event(:workflow_complete,
final_result: final_result.messages.last[:content],
agent_results: results.to_h,
session: @session_id
)
end
def handle_agent_response(event)
# Broadcast to connected clients
ActionCable.server.broadcast(
"orchestrator_#{@session_id}",
{
type: 'agent_response',
agent: event[:agent],
content: event[:chunk].delta,
timestamp: Time.current.iso8601
}
)
end
end
5.2. Event Bus Implementation
# lib/raaf/streaming/event_bus.rb
module RAAF
module Streaming
class EventBus
def initialize
@handlers = Hash.new { |h, k| h[k] = [] }
@middleware = []
end
def on(event_type, &handler)
@handlers[event_type] << handler
end
def emit(event_type, payload = {})
event = Event.new(event_type, payload)
# Apply middleware
@middleware.each { |middleware| middleware.call(event) }
# Execute handlers
@handlers[event_type].each do |handler|
Thread.new { handler.call(event) }
end
end
def use_middleware(&middleware)
@middleware << middleware
end
class Event
attr_reader :type, :payload, :timestamp, :id
def initialize(type, payload)
@type = type
@payload = payload
@timestamp = Time.current
@id = SecureRandom.uuid
end
def [](key)
@payload[key]
end
end
end
end
end
6. Real-time Collaboration
6.1. Multi-user Agent Sessions
# app/services/collaborative_agent_service.rb
class CollaborativeAgentService
attr_reader :session_id, :participants
def initialize(session_id:)
@session_id = session_id
@participants = Set.new
@agent = build_collaborative_agent
@message_queue = []
@processing = false
end
def add_participant(user)
@participants << user
broadcast_event(:participant_joined, user: user)
end
def remove_participant(user)
@participants.delete(user)
broadcast_event(:participant_left, user: user)
end
def submit_message(user, message)
queue_message(user, message)
process_queue unless @processing
end
private
def queue_message(user, message)
@message_queue << {
user: user,
message: message,
timestamp: Time.current
}
broadcast_event(:message_queued,
user: user,
message: message,
queue_size: @message_queue.size
)
end
def process_queue
return if @processing || @message_queue.empty?
@processing = true
while @message_queue.any?
item = @message_queue.shift
process_message_item(item)
end
@processing = false
end
def process_message_item(item)
context = {
collaborative_session: @session_id,
participant_count: @participants.size,
speaker: item[:user].name,
other_participants: @participants.reject { |p| p == item[:user] }.map(&:name)
}
runner = RAAF::Streaming::AsyncRunner.new(agent: @agent)
runner.run_and_stream(item[:message], context: context) do |chunk|
broadcast_chunk(chunk, item[:user])
end
end
def broadcast_chunk(chunk, original_user)
@participants.each do |participant|
ActionCable.server.broadcast(
"collaboration_#{@session_id}_#{participant.id}",
{
type: 'agent_chunk',
chunk_type: chunk.type,
content: chunk.delta,
original_user: original_user.name,
session_id: @session_id
}
)
end
end
def broadcast_event(event_type, data)
@participants.each do |participant|
ActionCable.server.broadcast(
"collaboration_#{@session_id}_#{participant.id}",
{
type: 'session_event',
event: event_type,
data: data,
timestamp: Time.current.iso8601
}
)
end
end
def build_collaborative_agent
RAAF::Agent.new(
name: "CollaborativeAssistant",
instructions: collaborative_instructions,
model: "gpt-4o"
)
end
def collaborative_instructions
<<~INSTRUCTIONS
You are facilitating a collaborative session with multiple participants.
When responding:
1. Address all participants appropriately
2. Build on previous contributions from different users
3. Encourage collaboration and discussion
4. Synthesize different viewpoints when helpful
5. Keep track of who said what for context
INSTRUCTIONS
end
end
7. Performance Optimization
7.1. Streaming Optimizations
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NameError: uninitialized constant RAAF::Streaming /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-duqm4e.rb:445:in '<main>'
# config/initializers/raaf_streaming.rb
RAAF::Streaming.configure do |config|
# Buffer chunks to reduce WebSocket overhead
config.chunk_buffer_size = 50 # characters
config.chunk_buffer_timeout = 100 # milliseconds
# Connection pooling for AI providers
config.connection_pool_size = 10
config.connection_timeout = 30
# Async processing
config.async_chunk_processing = true
config.chunk_processing_queue = :streaming_chunks
# Memory management
config.max_concurrent_streams = 100
config.stream_timeout = 300 # seconds
end
7.2. Chunk Buffering
# lib/raaf/streaming/chunk_buffer.rb
module RAAF
module Streaming
class ChunkBuffer
def initialize(size: 50, timeout: 100)
@buffer = ""
@size = size
@timeout = timeout
@last_flush = Time.current
@mutex = Mutex.new
end
def add_chunk(chunk, &flush_callback)
@mutex.synchronize do
@buffer += chunk.delta
should_flush = @buffer.length >= @size ||
(Time.current - @last_flush) * 1000 >= @timeout ||
chunk.type == :complete
if should_flush
flush(&flush_callback)
end
end
end
private
def flush(&callback)
return if @buffer.empty?
callback.call(@buffer) if callback
@buffer = ""
@last_flush = Time.current
end
end
end
end
7.3. Connection Management
# lib/raaf/streaming/connection_manager.rb
module RAAF
module Streaming
class ConnectionManager
include Singleton
def initialize
@connections = Concurrent::Hash.new
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 5,
max_threads: 20,
max_queue: 100
)
end
def register_stream(session_id, user_id)
connection_key = "#{session_id}_#{user_id}"
@connections[connection_key] = {
session_id: session_id,
user_id: user_id,
created_at: Time.current,
last_activity: Time.current
}
# Schedule cleanup
@pool.post { schedule_cleanup(connection_key) }
end
def unregister_stream(session_id, user_id)
connection_key = "#{session_id}_#{user_id}"
@connections.delete(connection_key)
end
def update_activity(session_id, user_id)
connection_key = "#{session_id}_#{user_id}"
connection = @connections[connection_key]
connection[:last_activity] = Time.current if connection
end
def active_connections_count
@connections.size
end
def cleanup_stale_connections
cutoff_time = 5.minutes.ago
@connections.each do |key, connection|
if connection[:last_activity] < cutoff_time
@connections.delete(key)
Rails.logger.info "Cleaned up stale connection: #{key}"
end
end
end
private
def schedule_cleanup(connection_key)
sleep(300) # 5 minutes
connection = @connections[connection_key]
if connection && connection[:last_activity] < 5.minutes.ago
@connections.delete(connection_key)
Rails.logger.info "Auto-cleaned up connection: #{connection_key}"
end
end
end
end
end
8. Error Handling and Resilience
8.1. Stream Error Recovery
# lib/raaf/streaming/resilient_runner.rb
module RAAF
module Streaming
class ResilientRunner < Runner
def run_and_stream(message, max_retries: 3, retry_delay: 1, &block)
retries = 0
begin
super(message, &block)
rescue RAAF::Errors::RateLimitError => e
if retries < max_retries
retries += 1
sleep_time = retry_delay * (2 ** (retries - 1)) # Exponential backoff
yield error_chunk("Rate limited. Retrying in #{sleep_time}s...", retries)
sleep(sleep_time)
retry
else
yield error_chunk("Max retries exceeded. Please try again later.", retries)
end
rescue RAAF::Errors::ProviderError => e
if retries < max_retries
retries += 1
yield error_chunk("Provider error. Switching to backup...", retries)
# Switch to backup provider
switch_to_backup_provider
retry
else
yield error_chunk("All providers failed. Please try again later.", retries)
end
rescue StandardError => e
yield error_chunk("Unexpected error: #{e.message}", retries)
raise
end
end
private
def error_chunk(message, retry_count)
RAAF::Streaming::Chunk.new(
type: :error,
delta: message,
metadata: { retry_count: retry_count }
)
end
def switch_to_backup_provider
# Implementation for provider switching
backup_providers = [
RAAF::Models::AnthropicProvider.new,
RAAF::Models::GroqProvider.new
]
@agent.provider = backup_providers.sample
end
end
end
end
8.2. Circuit Breaker Pattern
# lib/raaf/streaming/circuit_breaker.rb
module RAAF
module Streaming
class CircuitBreaker
STATES = [:closed, :open, :half_open].freeze
def initialize(failure_threshold: 5, timeout: 60)
@failure_threshold = failure_threshold
@timeout = timeout
@failure_count = 0
@last_failure_time = nil
@state = :closed
@mutex = Mutex.new
end
def call(&block)
@mutex.synchronize do
case @state
when :closed
execute_request(&block)
when :open
check_if_should_attempt_reset
raise RAAF::Errors::CircuitOpenError, "Circuit breaker is open"
when :half_open
attempt_reset(&block)
end
end
end
private
def execute_request(&block)
result = yield
on_success
result
rescue => e
on_failure
raise
end
def on_success
@failure_count = 0
@state = :closed
end
def on_failure
@failure_count += 1
@last_failure_time = Time.current
if @failure_count >= @failure_threshold
@state = :open
end
end
def check_if_should_attempt_reset
if Time.current - @last_failure_time >= @timeout
@state = :half_open
end
end
def attempt_reset(&block)
begin
result = yield
on_success
result
rescue => e
on_failure
@state = :open
raise
end
end
end
end
end
9. Monitoring and Analytics
9.1. Stream Performance Metrics
# app/services/streaming_analytics_service.rb
class StreamingAnalyticsService
include Singleton
def initialize
@metrics = Concurrent::Hash.new { |h, k| h[k] = Concurrent::Array.new }
end
def record_stream_started(session_id, user_id, agent_type)
@metrics[:streams_started] << {
session_id: session_id,
user_id: user_id,
agent_type: agent_type,
timestamp: Time.current
}
StatsD.increment('raaf.streams.started',
tags: ["agent_type:#{agent_type}"])
end
def record_chunk_sent(session_id, chunk_size, chunk_type)
@metrics[:chunks_sent] << {
session_id: session_id,
size: chunk_size,
type: chunk_type,
timestamp: Time.current
}
StatsD.histogram('raaf.chunks.size', chunk_size,
tags: ["chunk_type:#{chunk_type}"])
end
def record_stream_completed(session_id, total_chunks, total_duration)
@metrics[:streams_completed] << {
session_id: session_id,
total_chunks: total_chunks,
duration: total_duration,
timestamp: Time.current
}
StatsD.histogram('raaf.streams.duration', total_duration)
StatsD.histogram('raaf.streams.chunks', total_chunks)
end
def get_analytics_summary(time_range = 1.hour.ago..Time.current)
{
streams_started: count_in_range(@metrics[:streams_started], time_range),
streams_completed: count_in_range(@metrics[:streams_completed], time_range),
total_chunks: sum_in_range(@metrics[:chunks_sent], time_range, :size),
avg_stream_duration: avg_in_range(@metrics[:streams_completed], time_range, :duration),
completion_rate: calculate_completion_rate(time_range)
}
end
private
def count_in_range(metrics, time_range)
metrics.count { |m| time_range.cover?(m[:timestamp]) }
end
def sum_in_range(metrics, time_range, field)
metrics.select { |m| time_range.cover?(m[:timestamp]) }
.sum { |m| m[field] }
end
def avg_in_range(metrics, time_range, field)
relevant_metrics = metrics.select { |m| time_range.cover?(m[:timestamp]) }
return 0 if relevant_metrics.empty?
relevant_metrics.sum { |m| m[field] } / relevant_metrics.size.to_f
end
def calculate_completion_rate(time_range)
started = count_in_range(@metrics[:streams_started], time_range)
completed = count_in_range(@metrics[:streams_completed], time_range)
return 0 if started == 0
(completed.to_f / started * 100).round(2)
end
end
10. Testing Streaming Features
10.1. RSpec Streaming Tests
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NoMethodError: undefined method 'configure' for module RSpec /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-xnw71e.rb:481:in '<main>'
# spec/support/streaming_helpers.rb
module StreamingHelpers
def capture_stream(runner, message, timeout: 5)
chunks = []
completed = false
thread = Thread.new do
runner.run_and_stream(message) do |chunk|
chunks << chunk
completed = true if chunk.type == :complete
end
end
# Wait for completion or timeout
start_time = Time.current
while !completed && (Time.current - start_time) < timeout
sleep(0.01)
end
thread.kill unless completed
chunks
end
def mock_streaming_provider(responses = [])
provider = RAAF::Testing::MockProvider.new
responses.each_with_index do |response, index|
provider.add_streaming_response(
chunks: response[:chunks] || [response[:content]],
delay: response[:delay] || 0.01
)
end
provider
end
end
RSpec.configure do |config|
config.include StreamingHelpers, type: :streaming
end
10.2. Streaming Test Examples
EXAMPLE VALIDATION FAILED - This example needs work and contributions are welcome! Please see Contributing to RAAF for guidance.
Error: NameError: uninitialized constant StreamingAgentService /var/folders/r5/1t1h14ts04v5plm6tg1237pr0000gn/T/code_block20250725-12953-nghcq5.rb:445:in '<main>'
# spec/services/streaming_agent_service_spec.rb
RSpec.describe StreamingAgentService, type: :streaming do
let(:user) { create(:user) }
let(:service) { described_class.new(user: user) }
describe '#stream_chat' do
it 'yields chunks as they arrive' do
provider = mock_streaming_provider([
{ chunks: ["Hello", " there", "!"], delay: 0.01 }
])
allow(RAAF).to receive(:provider).and_return(provider)
chunks = capture_stream(service.runner, "Hello")
expect(chunks.size).to be >= 3
expect(chunks.map(&:delta)).to include("Hello", " there", "!")
expect(chunks.last.type).to eq(:complete)
end
it 'handles streaming errors gracefully' do
provider = mock_streaming_provider([])
provider.add_streaming_error(RAAF::Errors::RateLimitError.new("Rate limited"))
allow(RAAF).to receive(:provider).and_return(provider)
chunks = capture_stream(service.runner, "Hello")
expect(chunks.last.type).to eq(:error)
expect(chunks.last.delta).to include("Rate limited")
end
end
describe 'WebSocket integration' do
it 'broadcasts chunks to ActionCable' do
expect(ActionCable.server).to receive(:broadcast).at_least(3).times
service.stream_chat("Hello") do |chunk|
# Chunk handling verified by broadcast expectations
end
end
end
end
11. Best Practices
11.1. Streaming Guidelines
- Chunk Size Management - Balance between responsiveness and overhead
- Error Resilience - Always handle streaming errors gracefully
- Resource Cleanup - Properly clean up connections and threads
- Rate Limiting - Protect against streaming abuse
- Monitoring - Track streaming performance and errors
11.2. Performance Considerations
- Buffer Management - Use appropriate buffer sizes for your use case
- Connection Pooling - Reuse connections to AI providers
- Thread Management - Avoid creating too many concurrent threads
- Memory Usage - Monitor memory for long-running streams
- Network Efficiency - Compress data when possible
11.3. Security Considerations
- Authentication - Verify user identity for streaming sessions
- Authorization - Check permissions for streaming access
- Rate Limiting - Prevent streaming abuse
- Input Validation - Sanitize all streaming inputs
- Resource Limits - Set limits on concurrent streams per user
12. Next Steps
For more advanced topics:
- RAAF Rails Integration - Rails-specific streaming patterns
- Performance Guide - Advanced optimization techniques
- RAAF Tracing Guide - Monitoring streaming performance
- Configuration Reference - Streaming configuration options