OCSF Implementation Examples
This document provides detailed implementation examples and patterns for AOS's OCSF implementation.
Dependencies
Event Producer Implementation
from datetime import datetime
from uuid import uuid4
from py_ocsf_models.objects import Event, Activity, Finding
from py_ocsf_models.enums import CategoryType, ClassType, ActivityType
from py_ocsf_models.types import Metadata, Actor, API
from typing import Dict, Any, Optional
class OCSFEvent(Event):
"""Base OCSF event with configuration."""
class Config:
allow_population_by_field_name = True
def __init__(self, **data):
super().__init__(
category_uid=CategoryType.APPLICATION_ACTIVITY,
class_uid=ClassType.API_ACTIVITY,
**data
)
class SecurityFinding(Finding):
"""Security finding event with configuration."""
class Config:
allow_population_by_field_name = True
def __init__(self, **data):
super().__init__(
category_uid=CategoryType.FINDINGS,
class_uid=ClassType.SECURITY_FINDING,
**data
)
class OCSFAgentLogger:
"""OCSF-compliant logger for AI agent activities."""
def __init__(self, product_name: str = "ASOP Security Layer",
vendor_name: str = "ASOP",
ocsf_version: str = "1.0.0"):
self.product_name = product_name
self.vendor_name = vendor_name
self.ocsf_version = ocsf_version
def _create_base_metadata(self) -> Dict[str, Any]:
"""Create base metadata for OCSF events."""
return {
"version": "1.0.0",
"product": {
"name": self.product_name,
"vendor_name": self.vendor_name
},
"ocsf": {
"version": self.ocsf_version
}
}
def create_api_activity_event(self,
agent_event: Dict[str, Any],
validate: bool = True) -> Dict[str, Any]:
"""Create and validate an OCSF API Activity event."""
try:
event_data = {
"metadata": self._create_base_metadata(),
"activity_id": ActivityType.CREATE,
"activity_name": "Agent Tool Use",
"time": int(datetime.utcnow().timestamp() * 1000),
"severity_id": agent_event.get("severity_id", 1),
"actor": {
"user": {
"uid": agent_event.get("agent_id"),
"name": agent_event.get("agent_name"),
"type": "AI Agent"
}
},
"api": {
"service": {
"name": agent_event.get("service_name"),
"version": agent_event.get("service_version")
},
"operation": agent_event.get("operation")
},
"unmapped": {
"asop": {
"context": {
"agent": {
"id": agent_event.get("agent_id"),
"name": agent_event.get("agent_name"),
"version": agent_event.get("agent_version"),
"provider": {
"name": agent_event.get("provider_name"),
"url": agent_event.get("provider_url")
}
},
"session": {
"id": agent_event.get("session_id")
},
"model": {
"id": agent_event.get("model_id"),
"provider": {
"name": agent_event.get("model_provider")
}
}
},
"step": {
"id": agent_event.get("step_id"),
"type": agent_event.get("step_type"),
"turn_id": agent_event.get("turn_id"),
"reasoning": agent_event.get("reasoning")
}
}
}
}
# Add operation details based on step type
operation = {}
if agent_event.get("tool_id"):
operation = {
"type": "tool_execution",
"tool": {
"id": agent_event.get("tool_id"),
"execution_id": agent_event.get("execution_id"),
"inputs": agent_event.get("tool_inputs", []),
"outputs": agent_event.get("tool_outputs", []),
"is_error": agent_event.get("is_error", False)
}
}
elif agent_event.get("protocol_type"):
operation = {
"type": "protocol_message",
"protocol": {
"type": agent_event.get("protocol_type"),
"message": agent_event.get("protocol_message", {}),
"reasoning": agent_event.get("protocol_reasoning")
}
}
elif agent_event.get("memory_type"):
operation = {
"type": "memory_operation",
"action": agent_event.get("memory_type"),
"content": agent_event.get("memory_content", [])
}
elif agent_event.get("knowledge_query"):
operation = {
"type": "knowledge_operation",
"query": agent_event.get("knowledge_query"),
"keywords": agent_event.get("knowledge_keywords", []),
"results": agent_event.get("knowledge_results", [])
}
if operation:
event_data["unmapped"]["asop"]["step"]["operation"] = operation
# Add trace context if available
for field in ["trace_id", "span_id", "parent_span_id"]:
if field in agent_event:
event_data["metadata"][field] = agent_event[field]
if validate:
event = OCSFEvent(**event_data)
return event.dict(by_alias=True, exclude_none=True)
return event_data
except Exception as e:
raise ValueError(f"Failed to create OCSF event: {str(e)}")
def create_security_finding(self,
finding_data: Dict[str, Any],
validate: bool = True) -> Dict[str, Any]:
"""Create a security finding event."""
try:
event_data = {
"metadata": self._create_base_metadata(),
"activity_id": ActivityType.CREATE,
"activity_name": "Create",
"time": int(datetime.utcnow().timestamp() * 1000),
"severity_id": finding_data.get("severity_id", 3),
"finding": {
"title": finding_data.get("title", "Agent Policy Violation"),
"desc": finding_data.get("description"),
"types": finding_data.get("types", ["Policy Violation"]),
"uid": finding_data.get("finding_id", str(uuid4()))
},
"resources": [
{
"type": "AI Agent",
"uid": finding_data.get("agent_id"),
"name": finding_data.get("agent_name")
}
],
"unmapped": {
"policy_violated": finding_data.get("policy_name"),
"agent_context": finding_data.get("agent_context", {}),
"remediation": finding_data.get("remediation")
}
}
if validate:
event = SecurityFinding(**event_data)
return event.dict(by_alias=True, exclude_none=True)
return event_data
except Exception as e:
raise ValueError(f"Failed to create security finding: {str(e)}")
SIEM Integration Examples
Splunk
# Count tool usage by agent and tool
index=security sourcetype=ocsf
| where category_uid=6 AND class_uid=6003
| where unmapped.asop.step.operation.type="tool_execution"
| stats count by
unmapped.asop.context.agent.name,
unmapped.asop.step.operation.tool.id
| sort count desc
# Monitor agent activities by step type
index=security sourcetype=ocsf
| where category_uid=6
| stats count by
unmapped.asop.context.agent.name,
unmapped.asop.step.type
| sort count desc
# Track protocol message patterns
index=security sourcetype=ocsf
| where unmapped.asop.step.operation.type="protocol_message"
| stats count by
unmapped.asop.step.operation.protocol.type,
unmapped.asop.step.operation.protocol.message.params.message.action
| sort count desc
# Monitor model usage across agents
index=security sourcetype=ocsf
| stats count by
unmapped.asop.context.agent.name,
unmapped.asop.context.model.id,
unmapped.asop.context.model.provider.name
| sort count desc
Elasticsearch
{
"query": {
"bool": {
"must": [
{ "term": { "category_uid": 6 }},
{ "term": { "class_uid": 6003 }}
]
}
},
"aggs": {
"by_agent": {
"terms": {
"field": "unmapped.asop.context.agent.name.keyword",
"size": 10
},
"aggs": {
"by_step_type": {
"terms": {
"field": "unmapped.asop.step.type.keyword"
}
},
"by_operation_type": {
"terms": {
"field": "unmapped.asop.step.operation.type.keyword"
}
}
}
}
}
}
Implementation Guidelines
1. Error Handling
- Always validate events before sending
- Implement proper exception handling
- Log validation failures for debugging
- Include trace context in error reports
2. Performance Considerations
- Use async validation for high-volume events
- Implement event batching where appropriate
- Consider caching validated event templates
- Monitor SIEM ingestion performance
3. Security Guidelines
- Sanitize all user inputs before event creation
- Implement rate limiting for event generation
- Use secure transport for SIEM communication
- Regular trace of logged data for sensitive information
4. Schema Versioning
Always specify OCSF schema version in metadata:
5. Trace Context
- Include trace_id and span_id in metadata for workflow correlation
- Link related events across agent workflows
- Use parent_span_id for hierarchical relationships
6. Validation Best Practices
- Use py-ocsf-models validators for schema compliance
- Implement custom validation for agent-specific fields
- Validate events before sending to SIEM
- Handle validation errors gracefully