Agent Mesh Observer for Communication Interception and Tracking
Objective
Build mesh observer that intercepts all agent-to-agent communications, tracks relationships, and streams activity data to centralized activity stream service.
Dependencies
- OSSA Issue #21: Activity streaming schema extensions
- Requires: gRPC streaming, Phoenix/Arize integration
- Activity stream service running on port 50052
Scope
- Communication Interceptor - Intercept all mesh communications (HTTP, gRPC, MCP)
- Relationship Tracker - Build real-time agent relationship graph
- Activity Stream Publisher - Publish events to activity stream service
- Latency Tracking - Measure inter-agent communication latency
- Context Propagation - Pass conversation_id, trace_id through mesh
Architecture
┌─────────────────────────────────────────────────┐
│ Agent Mesh Observer │
├─────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Interceptor │──────│ Relationship │ │
│ │ Middleware │ │ Tracker │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ ┌──────────────────▼──────┐ │
│ └─▶│ Activity Publisher │ │
│ │ (gRPC Stream) │ │
│ └──────────┬──────────────┘ │
└────────────────────────┼─────────────────────────┘
│
▼ gRPC Stream (port 50052)
┌─────────────────────────────┐
│ Activity Stream Service │
└─────────────────────────────┘
Implementation
1. Interceptor Middleware
// src/observer/interceptor.ts
import { Context, Next } from 'koa';
import { ActivityStreamClient } from './activity-stream-client';
export class MeshInterceptor {
private activityClient: ActivityStreamClient;
private relationshipTracker: RelationshipTracker;
constructor(
activityStreamEndpoint: string,
options?: InterceptorOptions
) {
this.activityClient = new ActivityStreamClient(activityStreamEndpoint);
this.relationshipTracker = new RelationshipTracker();
}
/**
* Koa middleware to intercept all requests
*/
middleware() {
return async (ctx: Context, next: Next) => {
const startTime = Date.now();
// Extract metadata
const metadata = {
source_agent_id: ctx.get('X-Agent-ID'),
target_agent_id: ctx.params.agentId,
conversation_id: ctx.get('X-Conversation-ID'),
trace_id: ctx.get('X-Trace-ID'),
capability: ctx.path.split('/').pop(),
method: ctx.method,
timestamp: new Date().toISOString()
};
// Track relationship
this.relationshipTracker.recordCommunication(
metadata.source_agent_id,
metadata.target_agent_id,
metadata.capability
);
try {
await next();
const duration = Date.now() - startTime;
// Publish activity event
await this.activityClient.publishEvent({
type: 'capability_invocation',
status: 'success',
duration_ms: duration,
metadata,
response_status: ctx.status
});
} catch (error) {
const duration = Date.now() - startTime;
// Publish error event
await this.activityClient.publishEvent({
type: 'capability_invocation',
status: 'error',
duration_ms: duration,
metadata,
error: {
message: error.message,
stack: error.stack
}
});
throw error;
}
};
}
/**
* Get current relationship graph
*/
getRelationshipGraph(): AgentGraph {
return this.relationshipTracker.getGraph();
}
}
2. Relationship Tracker
// src/observer/relationship-tracker.ts
export interface AgentNode {
agent_id: string;
capabilities_called: string[];
call_count: number;
last_seen: Date;
}
export interface AgentEdge {
source: string;
target: string;
weight: number; // Number of communications
capabilities: string[];
avg_latency_ms: number;
}
export interface AgentGraph {
nodes: Map<string, AgentNode>;
edges: Map<string, AgentEdge>;
}
export class RelationshipTracker {
private graph: AgentGraph;
private latencyTracker: Map<string, number[]>;
constructor() {
this.graph = {
nodes: new Map(),
edges: new Map()
};
this.latencyTracker = new Map();
}
recordCommunication(
sourceId: string,
targetId: string,
capability: string,
latencyMs?: number
): void {
// Update nodes
this.updateNode(sourceId);
this.updateNode(targetId);
// Update edge
const edgeKey = `${sourceId}->${targetId}`;
const edge = this.graph.edges.get(edgeKey) || {
source: sourceId,
target: targetId,
weight: 0,
capabilities: [],
avg_latency_ms: 0
};
edge.weight++;
if (!edge.capabilities.includes(capability)) {
edge.capabilities.push(capability);
}
// Update latency
if (latencyMs !== undefined) {
const latencies = this.latencyTracker.get(edgeKey) || [];
latencies.push(latencyMs);
if (latencies.length > 100) latencies.shift(); // Keep last 100
edge.avg_latency_ms = latencies.reduce((a, b) => a + b, 0) / latencies.length;
this.latencyTracker.set(edgeKey, latencies);
}
this.graph.edges.set(edgeKey, edge);
}
private updateNode(agentId: string): void {
const node = this.graph.nodes.get(agentId) || {
agent_id: agentId,
capabilities_called: [],
call_count: 0,
last_seen: new Date()
};
node.call_count++;
node.last_seen = new Date();
this.graph.nodes.set(agentId, node);
}
getGraph(): AgentGraph {
return {
nodes: new Map(this.graph.nodes),
edges: new Map(this.graph.edges)
};
}
/**
* Export graph in DOT format for visualization
*/
toDOT(): string {
const lines = ['digraph AgentMesh {'];
// Nodes
for (const [id, node] of this.graph.nodes) {
lines.push(` "${id}" [label="${id}\\n${node.call_count} calls"];`);
}
// Edges
for (const [key, edge] of this.graph.edges) {
lines.push(` "${edge.source}" -> "${edge.target}" [label="${edge.weight}\\n${edge.avg_latency_ms.toFixed(0)}ms"];`);
}
lines.push('}');
return lines.join('\n');
}
}
3. Activity Stream Client (gRPC)
// src/observer/activity-stream-client.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
export interface ActivityEvent {
type: 'capability_invocation' | 'conversation' | 'state_change' | 'error';
status: 'success' | 'error';
duration_ms: number;
metadata: Record<string, any>;
error?: {
message: string;
stack?: string;
};
}
export class ActivityStreamClient {
private client: any;
private stream: grpc.ClientWritableStream<any>;
private buffer: ActivityEvent[] = [];
private flushInterval: NodeJS.Timeout;
constructor(
endpoint: string,
options: {
bufferSize?: number;
flushIntervalMs?: number;
compression?: boolean;
} = {}
) {
const packageDefinition = protoLoader.loadSync(
'proto/activity_stream.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
const proto = grpc.loadPackageDefinition(packageDefinition);
this.client = new (proto as any).ActivityStreamService(
endpoint,
grpc.credentials.createInsecure()
);
// Create bidirectional stream
this.stream = this.client.streamActivities();
// Auto-flush buffer
const flushIntervalMs = options.flushIntervalMs || 100;
this.flushInterval = setInterval(() => {
this.flush();
}, flushIntervalMs);
}
async publishEvent(event: ActivityEvent): Promise<void> {
this.buffer.push(event);
// Flush if buffer full
const bufferSize = 1000; // Default
if (this.buffer.length >= bufferSize) {
await this.flush();
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const events = this.buffer.splice(0, this.buffer.length);
for (const event of events) {
this.stream.write(event);
}
}
close(): void {
clearInterval(this.flushInterval);
this.flush();
this.stream.end();
}
}
4. Proto Definition
// proto/activity_stream.proto
syntax = "proto3";
package activity;
service ActivityStreamService {
rpc StreamActivities(stream ActivityEvent) returns (stream ActivityAck);
}
message ActivityEvent {
string type = 1; // capability_invocation, conversation, state_change, error
string status = 2; // success, error
int64 duration_ms = 3;
map<string, string> metadata = 4;
optional ErrorDetails error = 5;
string timestamp = 6;
}
message ErrorDetails {
string message = 1;
optional string stack = 2;
}
message ActivityAck {
string event_id = 1;
string status = 2;
}
Configuration (OSSA Manifest)
ossaVersion: "1.0"
agent:
id: agent-mesh-observer
name: Agent Mesh Observer
version: 1.0.0
role: monitoring
runtime:
type: k8s
image: ossa/agent-mesh-observer:1.0.0
resources:
cpu: 500m
memory: 1Gi
capabilities:
- name: intercept_communications
description: Intercept all mesh communications
monitoring:
traces: true
metrics: true
logs: true
activity_streaming:
enabled: true
protocol: grpc
endpoint: activity-stream-service:50052
stream_types:
- capability_invocations
buffer_size: 1000
flush_interval_ms: 100
mesh_observability:
enabled: true
intercept_communications: true
track_relationships: true
latency_tracking: true
Acceptance Criteria
-
Interceptor middleware for all mesh protocols (HTTP, gRPC) -
Relationship tracker with graph data structure -
Activity stream gRPC client with buffering -
Latency tracking for all inter-agent communications -
Context propagation (conversation_id, trace_id) -
DOT format export for graph visualization -
Prometheus metrics for mesh observability -
Unit tests for interceptor, tracker, client -
Integration tests with activity stream service -
OSSA manifest with mesh_observability config
Files to Create
src/observer/interceptor.ts
src/observer/relationship-tracker.ts
src/observer/activity-stream-client.ts
proto/activity_stream.proto
tests/observer/interceptor.test.ts
-
.ossa.yml
- OSSA manifest
Performance Requirements
- Interceptor overhead: <1ms per request
- Buffer flush: <100ms latency
- Memory usage: <50MB for 1000 active edges
- Throughput: Handle 1000 req/sec mesh traffic
Related Issues
- OSSA #21: Activity streaming schema extensions
- agent-brain #TBD: Activity stream service (port 50052)
- agent-studio #TBD: Live graph visualization