MCP Integration with OODA Pattern for Activity Streaming
Objective
Integrate Model Context Protocol (MCP) with OODA (Observe-Orient-Decide-Act) pattern for intelligent agent coordination and activity streaming.
Architecture
┌──────────────────────────────────────────────────────────┐
│ MCP + OODA Coordination Layer │
├──────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ gRPC Gateway│ │
│ └───────┬─────┘ │
│ │ │
│ ┌───────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌───▼────┐ ┌───▼────┐ ┌───▼────┐ │
│ │ Agent 1│ │ Agent 2│ │ Agent 3│ │
│ │Observer│◄────►│Analyzer│◄──────►│Executor│ │
│ │ OODA │ gRPC │ OODA │ gRPC │ OODA │ │
│ └────┬───┘ └────┬───┘ └────┬───┘ │
│ │ │ │ │
│ Gainsight GitLab API Salesforce │
│ │
│ Each agent runs OODA loop: │
│ 1. Observe - Collect data from integrations │
│ 2. Orient - Semantic analysis + context │
│ 3. Decide - Determine next action │
│ 4. Act - Execute capability + publish activity │
└──────────────────────────────────────────────────────────┘
OODA Pattern Implementation
1. OODA Agent Base Class
// src/mcp/ooda-agent.ts
import { ActivityStreamClient } from './activity-stream-client';
export interface OODAState {
phase: 'observe' | 'orient' | 'decide' | 'act';
observations: any[];
orientation: any;
decision: any;
action: any;
cycle_id: string;
started_at: string;
metadata: Record<string, any>;
}
export abstract class OODAAgent {
protected activityClient: ActivityStreamClient;
protected currentState: OODAState;
protected agentId: string;
constructor(
agentId: string,
activityStreamEndpoint: string
) {
this.agentId = agentId;
this.activityClient = new ActivityStreamClient(activityStreamEndpoint);
this.currentState = this.initializeState();
}
/**
* Run complete OODA cycle
*/
async runCycle(trigger?: any): Promise<any> {
this.currentState = this.initializeState();
this.currentState.metadata.trigger = trigger;
try {
// 1. OBSERVE
await this.transitionTo('observe');
const observations = await this.observe();
this.currentState.observations = observations;
// 2. ORIENT
await this.transitionTo('orient');
const orientation = await this.orient(observations);
this.currentState.orientation = orientation;
// 3. DECIDE
await this.transitionTo('decide');
const decision = await this.decide(orientation);
this.currentState.decision = decision;
// 4. ACT
await this.transitionTo('act');
const action = await this.act(decision);
this.currentState.action = action;
// Publish cycle completion
await this.publishCycleEvent('completed', {
result: action,
duration_ms: Date.now() - new Date(this.currentState.started_at).getTime()
});
return action;
} catch (error) {
await this.publishCycleEvent('error', { error: error.message });
throw error;
}
}
/**
* Phase 1: OBSERVE - Collect data from environment
*/
protected abstract observe(): Promise<any[]>;
/**
* Phase 2: ORIENT - Analyze and contextualize observations
*/
protected abstract orient(observations: any[]): Promise<any>;
/**
* Phase 3: DECIDE - Determine best course of action
*/
protected abstract decide(orientation: any): Promise<any>;
/**
* Phase 4: ACT - Execute the decided action
*/
protected abstract act(decision: any): Promise<any>;
/**
* Transition between OODA phases
*/
private async transitionTo(phase: OODAState['phase']): Promise<void> {
const previousPhase = this.currentState.phase;
this.currentState.phase = phase;
await this.activityClient.publishEvent({
type: 'state_change',
status: 'success',
duration_ms: 0,
metadata: {
agent_id: this.agentId,
state_machine: 'ooda',
from_phase: previousPhase,
to_phase: phase,
cycle_id: this.currentState.cycle_id
},
timestamp: new Date().toISOString()
});
}
/**
* Publish OODA cycle event
*/
private async publishCycleEvent(status: string, metadata: any): Promise<void> {
await this.activityClient.publishEvent({
type: 'state_change',
status: status === 'completed' ? 'success' : 'error',
duration_ms: metadata.duration_ms || 0,
metadata: {
agent_id: this.agentId,
ooda_cycle_id: this.currentState.cycle_id,
observations_count: this.currentState.observations.length,
...metadata
},
timestamp: new Date().toISOString()
});
}
private initializeState(): OODAState {
return {
phase: 'observe',
observations: [],
orientation: null,
decision: null,
action: null,
cycle_id: `ooda_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
started_at: new Date().toISOString(),
metadata: {}
};
}
}
2. Example: Gainsight Observer Agent
// src/mcp/agents/gainsight-observer.ts
import { OODAAgent } from '../ooda-agent';
import { GainsightClient } from '../../integrations/gainsight';
export class GainsightObserverAgent extends OODAAgent {
private gainsightClient: GainsightClient;
constructor(activityStreamEndpoint: string) {
super('gainsight-observer', activityStreamEndpoint);
this.gainsightClient = new GainsightClient();
}
/**
* OBSERVE: Collect customer health scores from Gainsight
*/
protected async observe(): Promise<any[]> {
const healthScores = await this.gainsightClient.getHealthScores({
status: 'at-risk',
updated_since: new Date(Date.now() - 24 * 60 * 60 * 1000)
});
return healthScores.map(score => ({
type: 'health_score',
customer_id: score.customer_id,
customer_name: score.customer_name,
score: score.score,
previous_score: score.previous_score,
factors: score.factors,
timestamp: score.updated_at
}));
}
/**
* ORIENT: Analyze health scores and determine severity
*/
protected async orient(observations: any[]): Promise<any> {
return {
at_risk_count: observations.length,
critical_customers: observations.filter(o => o.score < 30),
declining_customers: observations.filter(o =>
o.score < o.previous_score && o.previous_score - o.score > 10
),
severity: observations.some(o => o.score < 30) ? 'critical' : 'warning'
};
}
/**
* DECIDE: Determine which customers need immediate action
*/
protected async decide(orientation: any): Promise<any> {
if (orientation.severity === 'critical') {
return {
action: 'create_gitlab_issues',
targets: orientation.critical_customers.map(c => c.customer_id),
priority: 'high',
reason: 'Critical health scores detected'
};
}
return {
action: 'monitor',
targets: orientation.declining_customers.map(c => c.customer_id),
priority: 'medium',
reason: 'Declining health scores'
};
}
/**
* ACT: Execute the decided action
*/
protected async act(decision: any): Promise<any> {
if (decision.action === 'create_gitlab_issues') {
// Call GitLab Analyzer Agent via gRPC
const results = await this.callAgent('gitlab-analyzer', {
customers: decision.targets,
priority: decision.priority,
source: 'gainsight-health-score'
});
return {
action: 'gitlab_issues_created',
count: results.length,
issues: results
};
}
return {
action: 'monitoring_continued',
customers: decision.targets
};
}
/**
* Call another agent via gRPC
*/
private async callAgent(agentId: string, data: any): Promise<any> {
// This would use the gRPC gateway to call other agents
// Implementation in next section
return [];
}
}
3. gRPC Gateway for Agent-to-Agent Communication
// src/mcp/grpc-gateway.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
export class GrpcGateway {
private clients: Map<string, any> = new Map();
/**
* Register agent for gRPC communication
*/
async registerAgent(agentId: string, endpoint: string): Promise<void> {
const packageDefinition = protoLoader.loadSync(
'proto/agent_communication.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
const proto = grpc.loadPackageDefinition(packageDefinition);
const client = new (proto as any).AgentService(
endpoint,
grpc.credentials.createInsecure()
);
this.clients.set(agentId, client);
}
/**
* Call agent capability via gRPC
*/
async callAgent(
sourceAgentId: string,
targetAgentId: string,
capability: string,
data: any,
context?: any
): Promise<any> {
const client = this.clients.get(targetAgentId);
if (!client) {
throw new Error(`Agent ${targetAgentId} not registered`);
}
return new Promise((resolve, reject) => {
const metadata = new grpc.Metadata();
metadata.set('x-source-agent-id', sourceAgentId);
metadata.set('x-conversation-id', context?.conversation_id || '');
metadata.set('x-trace-id', context?.trace_id || '');
client.invokeCapability(
{
capability,
input: JSON.stringify(data),
metadata: context
},
metadata,
(error: any, response: any) => {
if (error) {
reject(error);
} else {
resolve(JSON.parse(response.output));
}
}
);
});
}
/**
* Stream capability (for long-running operations)
*/
streamAgentCapability(
targetAgentId: string,
capability: string,
data: any
): grpc.ClientReadableStream<any> {
const client = this.clients.get(targetAgentId);
if (!client) {
throw new Error(`Agent ${targetAgentId} not registered`);
}
return client.streamCapability({
capability,
input: JSON.stringify(data)
});
}
}
4. MCP Server Implementation
// src/mcp/mcp-server.ts
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { OODAAgent } from './ooda-agent';
export class MCPOodaServer {
private server: Server;
private agents: Map<string, OODAAgent> = new Map();
constructor(serverInfo: { name: string; version: string }) {
this.server = new Server(serverInfo, {
capabilities: {
tools: {},
resources: {}
}
});
this.setupHandlers();
}
/**
* Register OODA agent as MCP tool
*/
registerAgent(agent: OODAAgent): void {
const agentId = (agent as any).agentId;
this.agents.set(agentId, agent);
// Register as MCP tool
this.server.tool(
`ooda_${agentId}`,
`Run OODA cycle for ${agentId} agent`,
{
trigger: {
type: 'object',
description: 'Optional trigger data for OODA cycle'
}
},
async (args: any) => {
const result = await agent.runCycle(args.trigger);
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2)
}
]
};
}
);
}
/**
* Setup MCP handlers
*/
private setupHandlers(): void {
// List available tools (agents)
this.server.setRequestHandler('tools/list', async () => {
const tools = Array.from(this.agents.keys()).map(agentId => ({
name: `ooda_${agentId}`,
description: `Run OODA cycle for ${agentId} agent`,
inputSchema: {
type: 'object',
properties: {
trigger: { type: 'object' }
}
}
}));
return { tools };
});
// Call tool (run OODA cycle)
this.server.setRequestHandler('tools/call', async (request: any) => {
const agentId = request.params.name.replace('ooda_', '');
const agent = this.agents.get(agentId);
if (!agent) {
throw new Error(`Agent ${agentId} not found`);
}
const result = await agent.runCycle(request.params.arguments.trigger);
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2)
}
]
};
});
}
/**
* Start MCP server
*/
async start(): Promise<void> {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('MCP OODA Server started');
}
}
5. Proto Definition for Agent Communication
// proto/agent_communication.proto
syntax = "proto3";
package agent;
service AgentService {
// Invoke agent capability (unary)
rpc InvokeCapability(CapabilityRequest) returns (CapabilityResponse);
// Stream capability (server streaming)
rpc StreamCapability(CapabilityRequest) returns (stream CapabilityChunk);
// Bidirectional OODA coordination
rpc CoordinateOODA(stream OODAPhase) returns (stream OODAPhase);
}
message CapabilityRequest {
string capability = 1;
string input = 2; // JSON string
map<string, string> metadata = 3;
}
message CapabilityResponse {
string output = 1; // JSON string
int32 status_code = 2;
string error = 3;
}
message CapabilityChunk {
string data = 1;
bool is_final = 2;
}
message OODAPhase {
string agent_id = 1;
string cycle_id = 2;
string phase = 3; // observe, orient, decide, act
string data = 4; // JSON string
string timestamp = 5;
}
Configuration (OSSA Manifest)
ossaVersion: "1.0"
agent:
id: gainsight-observer
name: Gainsight Observer (OODA)
version: 1.0.0
role: monitoring
runtime:
type: k8s
image: ossa/gainsight-observer:1.0.0
capabilities:
- name: ooda_cycle
description: Run OODA cycle for customer health monitoring
input_schema:
type: object
properties:
trigger:
type: object
output_schema:
type: object
integration:
protocol: grpc
grpc:
port: 50051
reflection: true
health_check: true
monitoring:
traces: true
metrics: true
logs: true
activity_streaming:
enabled: true
protocol: grpc
endpoint: activity-stream-service:50052
stream_types:
- state_changes
- capability_invocations
Acceptance Criteria
-
OODA agent base class with all 4 phases -
gRPC gateway for agent-to-agent communication -
MCP server integration for OODA tools -
Example agents: Gainsight Observer, GitLab Analyzer, Salesforce Executor -
Proto definitions for agent communication -
Activity streaming for OODA state transitions -
Unit tests for OODA cycles -
Integration tests with multiple agents -
OSSA manifests for all example agents
Files to Create
src/mcp/ooda-agent.ts
src/mcp/grpc-gateway.ts
src/mcp/mcp-server.ts
src/mcp/agents/gainsight-observer.ts
src/mcp/agents/gitlab-analyzer.ts
src/mcp/agents/salesforce-executor.ts
proto/agent_communication.proto
tests/mcp/ooda-agent.test.ts
.ossa.yml