Activity Stream Service with Semantic Analysis and Conversation Tracking (gRPC Port 50052)
Objective
Build centralized activity stream service that receives real-time agent activity via gRPC streaming, performs semantic analysis, tracks conversations, and stores in time-series database.
Dependencies
- OSSA Issue #21: Activity streaming schema extensions
- agent-mesh Issue #6: Mesh observer for event publishing
- Requires: Phoenix/Arize, embedding models, time-series storage
Scope
- gRPC Streaming Server (port 50052) - Receive activity events from all agents
- Semantic Analysis Engine - Embed and analyze conversation content
- Conversation Tracker - Track multi-turn conversations with context
- Time-Series Storage - Historical activity analysis (TimescaleDB/InfluxDB)
- Phoenix/Arize Integration - Distributed tracing with agent context
- Real-Time Analytics - Live metrics and insights
Architecture
┌────────────────────────────────────────────────────────────┐
│ Activity Stream Service (Port 50052) │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ gRPC Server │◄───── Streams from agents (port 50052) │
│ │ (Streaming) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ Activity Event Router │ │
│ └──────┬────────────┬───────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌────────────────┐ │
│ │ Semantic │ │ Conversation │ │
│ │ Analysis │ │ Tracker │ │
│ │ Engine │ │ │ │
│ └────┬─────┘ └────┬───────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Time-Series Storage │ │
│ │ (TimescaleDB/InfluxDB) │ │
│ └──────────┬─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Phoenix/Arize Exporter │ │
│ └──────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
Implementation
1. gRPC Streaming Server
// src/activity-stream/grpc-server.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { SemanticAnalysisEngine } from './semantic-analysis';
import { ConversationTracker } from './conversation-tracker';
import { TimeSeriesStore } from './timeseries-store';
import { PhoenixExporter } from './phoenix-exporter';
export class ActivityStreamServer {
private server: grpc.Server;
private semanticEngine: SemanticAnalysisEngine;
private conversationTracker: ConversationTracker;
private timeseriesStore: TimeSeriesStore;
private phoenixExporter: PhoenixExporter;
constructor(options: ActivityStreamOptions) {
const packageDefinition = protoLoader.loadSync(
'proto/activity_stream.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
);
const proto = grpc.loadPackageDefinition(packageDefinition);
this.server = new grpc.Server();
// Initialize components
this.semanticEngine = new SemanticAnalysisEngine({
embeddingModel: options.embeddingModel || 'text-embedding-3-small'
});
this.conversationTracker = new ConversationTracker({
contextWindow: options.contextWindow || 10,
similarityThreshold: options.similarityThreshold || 0.85
});
this.timeseriesStore = new TimeSeriesStore({
type: options.storageType || 'timescaledb',
connectionString: options.storageConnection
});
this.phoenixExporter = new PhoenixExporter({
project: options.phoenixProject
});
// Register service
this.server.addService(
(proto as any).activity.ActivityStreamService.service,
{
streamActivities: this.handleActivityStream.bind(this)
}
);
}
/**
* Handle bidirectional activity stream
*/
private handleActivityStream(
call: grpc.ServerDuplexStream<ActivityEvent, ActivityAck>
): void {
console.log('New activity stream connected');
call.on('data', async (event: ActivityEvent) => {
try {
// Route event to appropriate handlers
await this.processEvent(event);
// Send acknowledgment
call.write({
event_id: event.metadata.trace_id || generateId(),
status: 'received'
});
} catch (error) {
console.error('Error processing activity event:', error);
call.write({
event_id: event.metadata.trace_id || generateId(),
status: 'error'
});
}
});
call.on('end', () => {
console.log('Activity stream ended');
call.end();
});
call.on('error', (error) => {
console.error('Activity stream error:', error);
});
}
/**
* Process activity event through pipeline
*/
private async processEvent(event: ActivityEvent): Promise<void> {
// 1. Store raw event
await this.timeseriesStore.insertEvent(event);
// 2. Handle by type
switch (event.type) {
case 'capability_invocation':
await this.handleCapabilityInvocation(event);
break;
case 'conversation':
await this.handleConversation(event);
break;
case 'state_change':
await this.handleStateChange(event);
break;
case 'error':
await this.handleError(event);
break;
}
// 3. Export to Phoenix/Arize
await this.phoenixExporter.exportEvent(event);
}
/**
* Handle conversation events with semantic analysis
*/
private async handleConversation(event: ActivityEvent): Promise<void> {
const { conversation_id, message, response } = event.metadata;
// Generate embeddings
const messageEmbedding = await this.semanticEngine.embed(message);
const responseEmbedding = await this.semanticEngine.embed(response);
// Semantic analysis
const analysis = await this.semanticEngine.analyze({
message,
response,
messageEmbedding,
responseEmbedding
});
// Track conversation
const turn = await this.conversationTracker.addTurn({
conversation_id,
message,
response,
embedding: messageEmbedding,
analysis,
timestamp: event.timestamp
});
// Store enriched data
await this.timeseriesStore.insertConversationTurn({
...turn,
agent_id: event.metadata.agent_id,
capability: event.metadata.capability
});
}
private async handleCapabilityInvocation(event: ActivityEvent): Promise<void> {
// Track invocation metrics
await this.timeseriesStore.insertMetric({
metric: 'capability_invocation',
agent_id: event.metadata.agent_id,
capability: event.metadata.capability,
duration_ms: event.duration_ms,
status: event.status,
timestamp: event.timestamp
});
}
async start(port: number = 50052): Promise<void> {
return new Promise((resolve, reject) => {
this.server.bindAsync(
`0.0.0.0:${port}`,
grpc.ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
reject(error);
return;
}
this.server.start();
console.log(`Activity Stream Service listening on port ${port}`);
resolve();
}
);
});
}
async stop(): Promise<void> {
return new Promise((resolve) => {
this.server.tryShutdown(() => {
resolve();
});
});
}
}
interface ActivityEvent {
type: 'capability_invocation' | 'conversation' | 'state_change' | 'error';
status: 'success' | 'error';
duration_ms: number;
metadata: {
agent_id: string;
conversation_id?: string;
trace_id?: string;
capability?: string;
message?: string;
response?: string;
[key: string]: any;
};
timestamp: string;
error?: {
message: string;
stack?: string;
};
}
interface ActivityAck {
event_id: string;
status: 'received' | 'error';
}
2. Semantic Analysis Engine
// src/activity-stream/semantic-analysis.ts
import OpenAI from 'openai';
export interface SemanticAnalysis {
sentiment: 'positive' | 'negative' | 'neutral';
intent: string;
entities: Array<{ type: string; value: string; confidence: number }>;
topics: string[];
similarity_score?: number;
}
export class SemanticAnalysisEngine {
private openai: OpenAI;
private embeddingModel: string;
constructor(options: { embeddingModel: string }) {
this.openai = new OpenAI();
this.embeddingModel = options.embeddingModel;
}
/**
* Generate embedding for text
*/
async embed(text: string): Promise<number[]> {
const response = await this.openai.embeddings.create({
model: this.embeddingModel,
input: text
});
return response.data[0].embedding;
}
/**
* Perform semantic analysis on conversation turn
*/
async analyze(params: {
message: string;
response: string;
messageEmbedding: number[];
responseEmbedding: number[];
}): Promise<SemanticAnalysis> {
// Calculate semantic similarity
const similarity = this.cosineSimilarity(
params.messageEmbedding,
params.responseEmbedding
);
// Extract entities, sentiment, intent using LLM
const analysis = await this.openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [
{
role: 'system',
content: `Analyze this conversation turn. Return JSON with:
{
"sentiment": "positive|negative|neutral",
"intent": "user intent description",
"entities": [{"type": "entity type", "value": "entity value", "confidence": 0-1}],
"topics": ["topic1", "topic2"]
}`
},
{
role: 'user',
content: `User: ${params.message}\nAssistant: ${params.response}`
}
],
response_format: { type: 'json_object' }
});
const result = JSON.parse(analysis.choices[0].message.content);
return {
...result,
similarity_score: similarity
};
}
/**
* Calculate cosine similarity between two vectors
*/
private cosineSimilarity(a: number[], b: number[]): number {
const dotProduct = a.reduce((sum, val, i) => sum + val * b[i], 0);
const magnitudeA = Math.sqrt(a.reduce((sum, val) => sum + val * val, 0));
const magnitudeB = Math.sqrt(b.reduce((sum, val) => sum + val * val, 0));
return dotProduct / (magnitudeA * magnitudeB);
}
}
3. Conversation Tracker
// src/activity-stream/conversation-tracker.ts
export interface ConversationTurn {
conversation_id: string;
turn_id: number;
parent_turn_id?: number;
message: string;
response: string;
embedding: number[];
analysis: SemanticAnalysis;
timestamp: string;
}
export interface ConversationContext {
conversation_id: string;
turns: ConversationTurn[];
current_turn: number;
started_at: string;
last_activity: string;
}
export class ConversationTracker {
private conversations: Map<string, ConversationContext>;
private contextWindow: number;
private similarityThreshold: number;
constructor(options: { contextWindow: number; similarityThreshold: number }) {
this.conversations = new Map();
this.contextWindow = options.contextWindow;
this.similarityThreshold = options.similarityThreshold;
}
/**
* Add a new turn to conversation
*/
async addTurn(turn: Omit<ConversationTurn, 'turn_id' | 'parent_turn_id'>): Promise<ConversationTurn> {
let context = this.conversations.get(turn.conversation_id);
if (!context) {
context = {
conversation_id: turn.conversation_id,
turns: [],
current_turn: 0,
started_at: turn.timestamp,
last_activity: turn.timestamp
};
this.conversations.set(turn.conversation_id, context);
}
// Find similar previous turns
const similarTurn = this.findSimilarTurn(context, turn.embedding);
const completeTurn: ConversationTurn = {
...turn,
turn_id: context.current_turn++,
parent_turn_id: similarTurn?.turn_id
};
context.turns.push(completeTurn);
context.last_activity = turn.timestamp;
// Maintain context window
if (context.turns.length > this.contextWindow) {
context.turns.shift();
}
return completeTurn;
}
/**
* Find semantically similar previous turn
*/
private findSimilarTurn(
context: ConversationContext,
embedding: number[]
): ConversationTurn | undefined {
let maxSimilarity = 0;
let similarTurn: ConversationTurn | undefined;
for (const turn of context.turns) {
const similarity = this.cosineSimilarity(turn.embedding, embedding);
if (similarity > maxSimilarity && similarity >= this.similarityThreshold) {
maxSimilarity = similarity;
similarTurn = turn;
}
}
return similarTurn;
}
/**
* Get conversation context
*/
getContext(conversation_id: string): ConversationContext | undefined {
return this.conversations.get(conversation_id);
}
/**
* Get recent turns with context
*/
getRecentTurns(conversation_id: string, count: number = 5): ConversationTurn[] {
const context = this.conversations.get(conversation_id);
if (!context) return [];
return context.turns.slice(-count);
}
private cosineSimilarity(a: number[], b: number[]): number {
const dotProduct = a.reduce((sum, val, i) => sum + val * b[i], 0);
const magnitudeA = Math.sqrt(a.reduce((sum, val) => sum + val * val, 0));
const magnitudeB = Math.sqrt(b.reduce((sum, val) => sum + val * val, 0));
return dotProduct / (magnitudeA * magnitudeB);
}
}
4. Time-Series Storage
// src/activity-stream/timeseries-store.ts
import { Client as PostgresClient } from 'pg';
export class TimeSeriesStore {
private client: PostgresClient;
constructor(options: { type: string; connectionString: string }) {
if (options.type === 'timescaledb') {
this.client = new PostgresClient({
connectionString: options.connectionString
});
}
}
async connect(): Promise<void> {
await this.client.connect();
await this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.client.query(`
CREATE TABLE IF NOT EXISTS activity_events (
time TIMESTAMPTZ NOT NULL,
event_type TEXT NOT NULL,
agent_id TEXT NOT NULL,
conversation_id TEXT,
trace_id TEXT,
capability TEXT,
status TEXT,
duration_ms INTEGER,
metadata JSONB,
PRIMARY KEY (time, agent_id, event_type)
);
SELECT create_hypertable('activity_events', 'time', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_activity_agent ON activity_events (agent_id, time DESC);
CREATE INDEX IF NOT EXISTS idx_activity_conversation ON activity_events (conversation_id, time DESC);
`);
await this.client.query(`
CREATE TABLE IF NOT EXISTS conversation_turns (
time TIMESTAMPTZ NOT NULL,
conversation_id TEXT NOT NULL,
turn_id INTEGER NOT NULL,
parent_turn_id INTEGER,
agent_id TEXT NOT NULL,
capability TEXT,
message TEXT,
response TEXT,
embedding vector(1536),
sentiment TEXT,
intent TEXT,
topics TEXT[],
similarity_score FLOAT,
PRIMARY KEY (time, conversation_id, turn_id)
);
SELECT create_hypertable('conversation_turns', 'time', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_conversation_turns ON conversation_turns (conversation_id, time DESC);
`);
}
async insertEvent(event: any): Promise<void> {
await this.client.query(
`INSERT INTO activity_events
(time, event_type, agent_id, conversation_id, trace_id, capability, status, duration_ms, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
[
event.timestamp,
event.type,
event.metadata.agent_id,
event.metadata.conversation_id,
event.metadata.trace_id,
event.metadata.capability,
event.status,
event.duration_ms,
JSON.stringify(event.metadata)
]
);
}
async insertConversationTurn(turn: any): Promise<void> {
await this.client.query(
`INSERT INTO conversation_turns
(time, conversation_id, turn_id, parent_turn_id, agent_id, capability, message, response, embedding, sentiment, intent, topics, similarity_score)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`,
[
turn.timestamp,
turn.conversation_id,
turn.turn_id,
turn.parent_turn_id,
turn.agent_id,
turn.capability,
turn.message,
turn.response,
JSON.stringify(turn.embedding),
turn.analysis.sentiment,
turn.analysis.intent,
turn.analysis.topics,
turn.analysis.similarity_score
]
);
}
async queryActivityByAgent(agentId: string, hours: number = 24): Promise<any[]> {
const result = await this.client.query(
`SELECT * FROM activity_events
WHERE agent_id = $1 AND time > NOW() - INTERVAL '$2 hours'
ORDER BY time DESC`,
[agentId, hours]
);
return result.rows;
}
async queryConversation(conversationId: string): Promise<any[]> {
const result = await this.client.query(
`SELECT * FROM conversation_turns
WHERE conversation_id = $1
ORDER BY time ASC, turn_id ASC`,
[conversationId]
);
return result.rows;
}
}
Configuration (OSSA Manifest)
ossaVersion: "1.0"
agent:
id: activity-stream-service
name: Activity Stream Service
version: 1.0.0
role: monitoring
runtime:
type: k8s
image: ossa/activity-stream-service:1.0.0
resources:
cpu: 2000m
memory: 4Gi
integration:
protocol: grpc
grpc:
port: 50052
reflection: true
health_check: true
streaming:
enabled: true
max_concurrent_streams: 1000
keepalive_time_ms: 10000
monitoring:
traces: true
metrics: true
logs: true
activity_streaming:
enabled: false # This IS the activity stream service
phoenix_arise:
enabled: true
project: activity-stream-production
export_interval_seconds: 60
Acceptance Criteria
-
gRPC streaming server on port 50052 -
Semantic analysis with text-embedding-3-small -
Conversation tracking with context window -
TimescaleDB integration for time-series storage -
Phoenix/Arize exporter with agent context -
Real-time analytics and metrics -
Support 1000+ concurrent streams -
Handle 10,000+ events/second -
<5ms processing latency per event -
Unit tests for all components -
Integration tests with agent-mesh -
OSSA manifest
Files to Create
src/activity-stream/grpc-server.ts
src/activity-stream/semantic-analysis.ts
src/activity-stream/conversation-tracker.ts
src/activity-stream/timeseries-store.ts
src/activity-stream/phoenix-exporter.ts
proto/activity_stream.proto
tests/activity-stream/
.ossa.yml
Performance Requirements
- Throughput: 10,000 events/second
- Latency: <5ms per event processing
- Memory: <4GB for 1M active conversations
- Storage: Efficient compression for time-series data
Related Issues
- OSSA #21: Activity streaming schema
- agent-mesh #6: Mesh observer
- agent-studio #TBD: Dashboard visualization