Real-time Communication Architecture

ProductFlo’s real-time communication system is built on a modern WebSocket architecture that supports both native WebSockets and Socket.IO protocols. This system enables a wide range of interactive features including:
  • Real-time chat and collaboration
  • Live AI-powered assistance
  • Collaborative editing and annotation
  • Status updates and notifications
  • File change broadcasting
  • Streaming long-running task results

Architecture Overview

WebSocket Architecture
The real-time communication architecture is built on several key components:

WebSocket Manager

The WebSocketManager class (utils/websocket_manager.py) serves as the central component for all WebSocket operations:
  • Connection handling and authentication
  • Message routing and validation
  • Room-based communication
  • Broadcasting capabilities
  • Connection tracking and monitoring

Redis PubSub

Redis powers our real-time system with its Pub/Sub capabilities:
  • Scalability: Enables horizontal scaling across multiple application instances
  • Persistence: Maintains connection state even across application restarts
  • Message Queuing: Stores messages for temporarily disconnected clients
  • Room Management: Efficiently tracks room memberships for broadcasting

Process Manager Integration

For computationally intensive operations, our WebSocket system integrates with a Process Manager:
  • Asynchronous Processing: Offloads heavy operations to background tasks
  • Task Tracking: Provides real-time updates on long-running processes
  • Fault Tolerance: Ensures resilience through error handling and fallbacks
  • Multi-tenancy Support: Preserves tenant context in background tasks

Message Flow

The typical flow for WebSocket messages in ProductFlo:
  1. Connection: Client establishes a WebSocket connection (with optional authentication)
  2. Message Receipt: Server receives a message from client
  3. Validation: Message format and permissions are validated using Pydantic models
  4. Processing:
    • For simple operations: handled directly by the WebSocket manager
    • For complex operations: delegated to Process Manager tasks
  5. Response: Results are sent back to the client using the BaseResponse model
  6. Broadcasting: If needed, messages are broadcast to relevant rooms

Room-based Communication

Our WebSocket system implements a room-based communication model: This approach allows for:
  • Targeted Communication: Messages only go to relevant clients
  • Scalability: Efficiently handle thousands of concurrent connections
  • Flexibility: Clients can join multiple rooms simultaneously
  • Tenant Isolation: Easy separation of data between tenants

Multi-Tenant WebSocket Support

The WebSocket system fully supports multi-tenancy:
  • Tenant-Specific Rooms: Room names include tenant identifiers for isolation
  • Connection Context: Each connection maintains its tenant context
  • Permission Enforcement: Access control based on tenant membership
  • Cross-Tenant Collaboration: Controlled sharing between tenants when authorized

AI Integration

Our WebSocket system provides a streamlined interface for AI interactions: This enables:
  • Streaming Responses: Real-time AI responses with progressive rendering
  • Context Preservation: Maintains conversation context across multiple requests
  • Provider Flexibility: Supports multiple LLM providers (OpenAI, Anthropic, etc.)
  • Background Processing: Handles long-running AI tasks efficiently

Reliability Features

The WebSocket system includes several reliability mechanisms:
  • Automatic Reconnection: Clients can reconnect with state preservation
  • Message Queuing: Important messages are stored for disconnected clients
  • Heartbeat Monitoring: Connection health is continuously verified
  • Error Recovery: Graceful handling of various failure scenarios
  • Load Balancing: Distributes connections across multiple server instances

Security Considerations

Security is a priority in our WebSocket implementation:
  • JWT Authentication: All connections require valid JWT tokens
  • Message Validation: Strict schema validation for all messages
  • Rate Limiting: Prevents abuse through connection and message rate limits
  • Permission Checking: Access control at both connection and message level
  • Tenant Isolation: Strong boundaries between tenant data
  • Encryption: All communications are encrypted with TLS

Client SDKs

We provide client SDKs to simplify integration with our WebSocket system:
  • JavaScript Client: For web applications
  • Python Client: For backend integrations
  • React Hooks: For React-based frontends
  • Mobile SDKs: For iOS and Android applications
These SDKs handle connection management, authentication, reconnection logic, and message formatting automatically.

Implementation Examples

Joining a Room and Listening for Updates

// Connect to WebSocket with authentication
const socket = new WebSocket('wss://api.productflo.io/ws/secure?access_token=YOUR_JWT_TOKEN');

// Join a product room after connection
socket.onopen = () => {
  socket.send(JSON.stringify({
    code: 'join',
    user_id: 'user-456',
    room_id: 'product-123',
    data: {}
  }));
};

// Listen for product updates
socket.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.code === 'file_update' && data.room_id === 'product-123') {
    console.log('Product file updated:', data.data.file_id);
    // Reload or update UI
  }
};

Real-time AI Collaboration

// Request AI design feedback
socket.send(JSON.stringify({
  code: 'haitch_chat',
  user_id: 'user-456',
  room_id: 'design-123',
  data: {
    query: 'Review this PCB design and suggest improvements',
    doc_type: 'engineering_analysis',
    source_urls: ['https://path.to/pcb-456-design.pdf']
  }
}));

// Process streaming response
let fullResponse = '';

socket.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  // Handle streaming chunks (pending status)
  if (data.status === 'pending' && data.code === 'chat_response') {
    fullResponse += data.data.message || '';
    updateUI(fullResponse); // Progressive rendering
  }
  
  // Handle final response (success status)
  if (data.status === 'success' && data.code === 'chat_response') {
    console.log('AI analysis complete:', data.data);
    finalizeUI(data.data.message);
  }
};