Protocol-Level Activity Streaming Integration for All Communication Types
Objective
Integrate activity streaming into agent-protocol layer to automatically publish activity events for all protocol types (HTTP, gRPC, MCP, WebSocket) without requiring agent-specific code changes.
Dependencies
- OSSA Issue #21: Activity streaming schema
- agent-brain Issue #10: Activity stream service (port 50052)
- Requires: Protocol interceptors, context propagation
Scope
- Protocol Interceptors - Automatic activity publishing for HTTP, gRPC, MCP, WebSocket
- Context Propagation - Pass conversation_id, trace_id across protocol boundaries
- Protocol Adapters - Unified activity event format from different protocols
- Error Handling - Publish protocol-level errors and retries
- Performance Overhead - Minimal latency impact (<1ms per call)
Architecture
┌──────────────────────────────────────────────────────┐
│ Agent Protocol Layer │
├──────────────────────────────────────────────────────┤
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ HTTP │ │ gRPC │ │ MCP │ │ WS │ │
│ │ Handler│ │ Handler│ │ Handler│ │ Handler│ │
│ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │ │
│ └───────────┴────────────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ Protocol Interceptor │ │
│ │ (Activity Publisher) │ │
│ └───────────┬───────────┘ │
│ │ │
│ ▼ gRPC Stream (port 50052) │
│ ┌───────────────────────┐ │
│ │ Activity Stream Client │ │
│ └───────────────────────┘ │
└──────────────────────────────────────────────────────┘
Implementation
1. Protocol Interceptor Base Class
// src/interceptors/protocol-interceptor.ts
import { ActivityStreamClient } from './activity-stream-client';
export interface ProtocolContext {
protocol: 'http' | 'grpc' | 'mcp' | 'websocket';
agent_id: string;
target_agent_id?: string;
capability: string;
conversation_id?: string;
trace_id?: string;
session_id?: string;
metadata: Record<string, any>;
}
export interface ProtocolResult {
success: boolean;
status_code?: number;
error?: Error;
duration_ms: number;
response_size?: number;
}
export abstract class ProtocolInterceptor {
protected activityClient: ActivityStreamClient;
constructor(activityStreamEndpoint: string) {
this.activityClient = new ActivityStreamClient(activityStreamEndpoint);
}
/**
* Intercept protocol call and publish activity
*/
async intercept<T>(
context: ProtocolContext,
handler: () => Promise<T>
): Promise<T> {
const startTime = Date.now();
let result: ProtocolResult;
try {
const response = await handler();
result = {
success: true,
duration_ms: Date.now() - startTime,
response_size: this.estimateSize(response)
};
// Publish success event
await this.publishActivity(context, result);
return response;
} catch (error) {
result = {
success: false,
duration_ms: Date.now() - startTime,
error: error as Error
};
// Publish error event
await this.publishActivity(context, result);
throw error;
}
}
/**
* Publish activity event to stream service
*/
private async publishActivity(
context: ProtocolContext,
result: ProtocolResult
): Promise<void> {
await this.activityClient.publishEvent({
type: 'capability_invocation',
status: result.success ? 'success' : 'error',
duration_ms: result.duration_ms,
metadata: {
protocol: context.protocol,
agent_id: context.agent_id,
target_agent_id: context.target_agent_id,
capability: context.capability,
conversation_id: context.conversation_id,
trace_id: context.trace_id,
session_id: context.session_id,
status_code: result.status_code,
response_size: result.response_size,
...context.metadata
},
error: result.error ? {
message: result.error.message,
stack: result.error.stack
} : undefined,
timestamp: new Date().toISOString()
});
}
private estimateSize(obj: any): number {
try {
return new Blob([JSON.stringify(obj)]).size;
} catch {
return 0;
}
}
/**
* Extract context from request (protocol-specific)
*/
abstract extractContext(request: any): ProtocolContext;
}
2. HTTP Protocol Interceptor
// src/interceptors/http-interceptor.ts
import { ProtocolInterceptor, ProtocolContext } from './protocol-interceptor';
import { Request, Response, NextFunction } from 'express';
export class HttpProtocolInterceptor extends ProtocolInterceptor {
/**
* Express middleware for HTTP protocol
*/
middleware() {
return async (req: Request, res: Response, next: NextFunction) => {
const context = this.extractContext(req);
// Capture response
const originalSend = res.send;
let responseSize = 0;
res.send = function(data: any) {
responseSize = Buffer.byteLength(data);
return originalSend.call(this, data);
};
try {
await this.intercept(context, async () => {
await new Promise<void>((resolve, reject) => {
res.on('finish', () => resolve());
res.on('error', reject);
next();
});
});
} catch (error) {
next(error);
}
};
}
extractContext(req: Request): ProtocolContext {
return {
protocol: 'http',
agent_id: req.get('X-Agent-ID') || 'unknown',
target_agent_id: req.params.agentId,
capability: req.path.split('/').pop() || req.path,
conversation_id: req.get('X-Conversation-ID'),
trace_id: req.get('X-Trace-ID'),
session_id: req.get('X-Session-ID'),
metadata: {
method: req.method,
path: req.path,
query: req.query,
user_agent: req.get('User-Agent')
}
};
}
}
3. gRPC Protocol Interceptor
// src/interceptors/grpc-interceptor.ts
import { ProtocolInterceptor, ProtocolContext } from './protocol-interceptor';
import * as grpc from '@grpc/grpc-js';
export class GrpcProtocolInterceptor extends ProtocolInterceptor {
/**
* gRPC interceptor
*/
createInterceptor(): grpc.Interceptor {
return (options, nextCall) => {
return new grpc.InterceptingCall(nextCall(options), {
start: (metadata, listener, next) => {
const context = this.extractContextFromMetadata(metadata);
this.intercept(context, async () => {
// gRPC call execution
return new Promise((resolve, reject) => {
next(metadata, {
onReceiveMessage: (message, next) => {
resolve(message);
next(message);
},
onReceiveStatus: (status, next) => {
if (status.code !== grpc.status.OK) {
reject(new Error(status.details));
}
next(status);
}
});
});
});
}
});
};
}
extractContext(call: any): ProtocolContext {
return this.extractContextFromMetadata(call.metadata);
}
private extractContextFromMetadata(metadata: grpc.Metadata): ProtocolContext {
return {
protocol: 'grpc',
agent_id: metadata.get('x-agent-id')[0] as string || 'unknown',
target_agent_id: metadata.get('x-target-agent-id')[0] as string,
capability: metadata.get('x-capability')[0] as string || 'unknown',
conversation_id: metadata.get('x-conversation-id')[0] as string,
trace_id: metadata.get('x-trace-id')[0] as string,
session_id: metadata.get('x-session-id')[0] as string,
metadata: {}
};
}
}
4. MCP Protocol Interceptor
// src/interceptors/mcp-interceptor.ts
import { ProtocolInterceptor, ProtocolContext } from './protocol-interceptor';
export class McpProtocolInterceptor extends ProtocolInterceptor {
/**
* Intercept MCP message
*/
async interceptMcpMessage(message: any, handler: () => Promise<any>): Promise<any> {
const context = this.extractContext(message);
return this.intercept(context, handler);
}
extractContext(message: any): ProtocolContext {
return {
protocol: 'mcp',
agent_id: message.from || 'unknown',
target_agent_id: message.to,
capability: message.method || message.type,
conversation_id: message.conversation_id,
trace_id: message.trace_id,
session_id: message.session_id,
metadata: {
message_type: message.type,
version: message.version
}
};
}
}
5. Context Propagation Utility
// src/utils/context-propagation.ts
export interface PropagationContext {
conversation_id: string;
trace_id: string;
session_id?: string;
parent_span_id?: string;
}
export class ContextPropagation {
/**
* Generate new context or inherit from parent
*/
static createContext(parent?: PropagationContext): PropagationContext {
return {
conversation_id: parent?.conversation_id || this.generateId('conv'),
trace_id: parent?.trace_id || this.generateId('trace'),
session_id: parent?.session_id,
parent_span_id: parent?.trace_id
};
}
/**
* Inject context into HTTP headers
*/
static injectHttp(context: PropagationContext, headers: Record<string, string>): void {
headers['X-Conversation-ID'] = context.conversation_id;
headers['X-Trace-ID'] = context.trace_id;
if (context.session_id) {
headers['X-Session-ID'] = context.session_id;
}
if (context.parent_span_id) {
headers['X-Parent-Span-ID'] = context.parent_span_id;
}
}
/**
* Inject context into gRPC metadata
*/
static injectGrpc(context: PropagationContext, metadata: any): void {
metadata.set('x-conversation-id', context.conversation_id);
metadata.set('x-trace-id', context.trace_id);
if (context.session_id) {
metadata.set('x-session-id', context.session_id);
}
if (context.parent_span_id) {
metadata.set('x-parent-span-id', context.parent_span_id);
}
}
/**
* Extract context from HTTP headers
*/
static extractHttp(headers: Record<string, string>): PropagationContext | null {
const conversationId = headers['x-conversation-id'];
const traceId = headers['x-trace-id'];
if (!conversationId || !traceId) {
return null;
}
return {
conversation_id: conversationId,
trace_id: traceId,
session_id: headers['x-session-id'],
parent_span_id: headers['x-parent-span-id']
};
}
private static generateId(prefix: string): string {
return `${prefix}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
Configuration (OSSA Manifest)
ossaVersion: "1.0"
agent:
id: protocol-interceptor
name: Protocol Activity Interceptor
version: 1.0.0
role: monitoring
integration:
protocol: http
endpoints:
health: /health
metrics: /metrics
monitoring:
traces: true
metrics: true
logs: true
activity_streaming:
enabled: true
protocol: grpc
endpoint: activity-stream-service:50052
stream_types:
- capability_invocations
buffer_size: 1000
flush_interval_ms: 100
Acceptance Criteria
-
HTTP protocol interceptor (Express middleware) -
gRPC protocol interceptor -
MCP protocol interceptor -
WebSocket protocol interceptor -
Context propagation utilities -
Automatic activity publishing for all protocols -
<1ms overhead per protocol call -
Unit tests for all interceptors -
Integration tests with activity stream service -
OSSA manifest
Performance Requirements
- Overhead: <1ms per protocol call
- Throughput: Handle 10,000+ calls/second
- Memory: <50MB for interceptor layer
- Reliability: 99.99% activity publishing success
Files to Create
src/interceptors/protocol-interceptor.ts
src/interceptors/http-interceptor.ts
src/interceptors/grpc-interceptor.ts
src/interceptors/mcp-interceptor.ts
src/interceptors/websocket-interceptor.ts
src/utils/context-propagation.ts
src/utils/activity-stream-client.ts
tests/interceptors/
.ossa.yml