Skip to content

OCSF Implementation Examples

This document provides detailed implementation examples and patterns for AOS's OCSF implementation.

Dependencies

pip install py-ocsf-models

Event Producer Implementation

from datetime import datetime
from uuid import uuid4
from py_ocsf_models.objects import Event, Activity, Finding
from py_ocsf_models.enums import CategoryType, ClassType, ActivityType
from py_ocsf_models.types import Metadata, Actor, API
from typing import Dict, Any, Optional

class OCSFEvent(Event):
    """Base OCSF event with configuration."""
    class Config:
        allow_population_by_field_name = True

    def __init__(self, **data):
        super().__init__(
            category_uid=CategoryType.APPLICATION_ACTIVITY,
            class_uid=ClassType.API_ACTIVITY,
            **data
        )

class SecurityFinding(Finding):
    """Security finding event with configuration."""
    class Config:
        allow_population_by_field_name = True

    def __init__(self, **data):
        super().__init__(
            category_uid=CategoryType.FINDINGS,
            class_uid=ClassType.SECURITY_FINDING,
            **data
        )

class OCSFAgentLogger:
    """OCSF-compliant logger for AI agent activities."""

    def __init__(self, product_name: str = "ASOP Security Layer",
                 vendor_name: str = "ASOP",
                 ocsf_version: str = "1.0.0"):
        self.product_name = product_name
        self.vendor_name = vendor_name
        self.ocsf_version = ocsf_version

    def _create_base_metadata(self) -> Dict[str, Any]:
        """Create base metadata for OCSF events."""
        return {
            "version": "1.0.0",
            "product": {
                "name": self.product_name,
                "vendor_name": self.vendor_name
            },
            "ocsf": {
                "version": self.ocsf_version
            }
        }

    def create_api_activity_event(self, 
                                agent_event: Dict[str, Any],
                                validate: bool = True) -> Dict[str, Any]:
        """Create and validate an OCSF API Activity event."""
        try:
            event_data = {
                "metadata": self._create_base_metadata(),
                "activity_id": ActivityType.CREATE,
                "activity_name": "Agent Tool Use",
                "time": int(datetime.utcnow().timestamp() * 1000),
                "severity_id": agent_event.get("severity_id", 1),
                "actor": {
                    "user": {
                        "uid": agent_event.get("agent_id"),
                        "name": agent_event.get("agent_name"),
                        "type": "AI Agent"
                    }
                },
                "api": {
                    "service": {
                        "name": agent_event.get("service_name"),
                        "version": agent_event.get("service_version")
                    },
                    "operation": agent_event.get("operation")
                },
                "unmapped": {
                    "asop": {
                        "context": {
                            "agent": {
                                "id": agent_event.get("agent_id"),
                                "name": agent_event.get("agent_name"),
                                "version": agent_event.get("agent_version"),
                                "provider": {
                                    "name": agent_event.get("provider_name"),
                                    "url": agent_event.get("provider_url")
                                }
                            },
                            "session": {
                                "id": agent_event.get("session_id")
                            },
                            "model": {
                                "id": agent_event.get("model_id"),
                                "provider": {
                                    "name": agent_event.get("model_provider")
                                }
                            }
                        },
                        "step": {
                            "id": agent_event.get("step_id"),
                            "type": agent_event.get("step_type"),
                            "turn_id": agent_event.get("turn_id"),
                            "reasoning": agent_event.get("reasoning")
                        }
                    }
                }
            }

            # Add operation details based on step type
            operation = {}
            if agent_event.get("tool_id"):
                operation = {
                    "type": "tool_execution",
                    "tool": {
                        "id": agent_event.get("tool_id"),
                        "execution_id": agent_event.get("execution_id"),
                        "inputs": agent_event.get("tool_inputs", []),
                        "outputs": agent_event.get("tool_outputs", []),
                        "is_error": agent_event.get("is_error", False)
                    }
                }
            elif agent_event.get("protocol_type"):
                operation = {
                    "type": "protocol_message",
                    "protocol": {
                        "type": agent_event.get("protocol_type"),
                        "message": agent_event.get("protocol_message", {}),
                        "reasoning": agent_event.get("protocol_reasoning")
                    }
                }
            elif agent_event.get("memory_type"):
                operation = {
                    "type": "memory_operation",
                    "action": agent_event.get("memory_type"),
                    "content": agent_event.get("memory_content", [])
                }
            elif agent_event.get("knowledge_query"):
                operation = {
                    "type": "knowledge_operation",
                    "query": agent_event.get("knowledge_query"),
                    "keywords": agent_event.get("knowledge_keywords", []),
                    "results": agent_event.get("knowledge_results", [])
                }

            if operation:
                event_data["unmapped"]["asop"]["step"]["operation"] = operation

            # Add trace context if available
            for field in ["trace_id", "span_id", "parent_span_id"]:
                if field in agent_event:
                    event_data["metadata"][field] = agent_event[field]

            if validate:
                event = OCSFEvent(**event_data)
                return event.dict(by_alias=True, exclude_none=True)
            return event_data

        except Exception as e:
            raise ValueError(f"Failed to create OCSF event: {str(e)}")

    def create_security_finding(self, 
                              finding_data: Dict[str, Any],
                              validate: bool = True) -> Dict[str, Any]:
        """Create a security finding event."""
        try:
            event_data = {
                "metadata": self._create_base_metadata(),
                "activity_id": ActivityType.CREATE,
                "activity_name": "Create",
                "time": int(datetime.utcnow().timestamp() * 1000),
                "severity_id": finding_data.get("severity_id", 3),
                "finding": {
                    "title": finding_data.get("title", "Agent Policy Violation"),
                    "desc": finding_data.get("description"),
                    "types": finding_data.get("types", ["Policy Violation"]),
                    "uid": finding_data.get("finding_id", str(uuid4()))
                },
                "resources": [
                    {
                        "type": "AI Agent",
                        "uid": finding_data.get("agent_id"),
                        "name": finding_data.get("agent_name")
                    }
                ],
                "unmapped": {
                    "policy_violated": finding_data.get("policy_name"),
                    "agent_context": finding_data.get("agent_context", {}),
                    "remediation": finding_data.get("remediation")
                }
            }

            if validate:
                event = SecurityFinding(**event_data)
                return event.dict(by_alias=True, exclude_none=True)
            return event_data

        except Exception as e:
            raise ValueError(f"Failed to create security finding: {str(e)}")

SIEM Integration Examples

Splunk

# Count tool usage by agent and tool
index=security sourcetype=ocsf
| where category_uid=6 AND class_uid=6003
| where unmapped.asop.step.operation.type="tool_execution"
| stats count by 
    unmapped.asop.context.agent.name,
    unmapped.asop.step.operation.tool.id
| sort count desc

# Monitor agent activities by step type
index=security sourcetype=ocsf
| where category_uid=6
| stats count by 
    unmapped.asop.context.agent.name,
    unmapped.asop.step.type
| sort count desc

# Track protocol message patterns
index=security sourcetype=ocsf
| where unmapped.asop.step.operation.type="protocol_message"
| stats count by 
    unmapped.asop.step.operation.protocol.type,
    unmapped.asop.step.operation.protocol.message.params.message.action
| sort count desc

# Monitor model usage across agents
index=security sourcetype=ocsf
| stats count by 
    unmapped.asop.context.agent.name,
    unmapped.asop.context.model.id,
    unmapped.asop.context.model.provider.name
| sort count desc

Elasticsearch

{
  "query": {
    "bool": {
      "must": [
        { "term": { "category_uid": 6 }},
        { "term": { "class_uid": 6003 }}
      ]
    }
  },
  "aggs": {
    "by_agent": {
      "terms": {
        "field": "unmapped.asop.context.agent.name.keyword",
        "size": 10
      },
      "aggs": {
        "by_step_type": {
          "terms": {
            "field": "unmapped.asop.step.type.keyword"
          }
        },
        "by_operation_type": {
          "terms": {
            "field": "unmapped.asop.step.operation.type.keyword"
          }
        }
      }
    }
  }
}

Custom Fields Examples

1. Basic Agent Event Structure

{
  "unmapped": {
    "asop": {
      "context": {
        "agent": {
          "id": "agent-123",
          "name": "CustomerServiceAgent",
          "version": "1.0.0",
          "provider": {
            "name": "ASOP",
            "url": "https://example.asop"
          }
        },
        "session": {
          "id": "session-789"
        },
        "model": {
          "id": "gpt-4",
          "provider": {
            "name": "OpenAI"
          }
        }
      },
      "step": {
        "id": "step-abc",
        "type": "toolCall",
        "turn_id": "turn-456",
        "reasoning": "User requested customer information"
      }
    }
  }
}

2. Tool Execution Event

{
  "unmapped": {
    "asop": {
      "step": {
        "operation": {
          "type": "tool_execution",
          "tool": {
            "id": "database_query",
            "execution_id": "exec-123",
            "inputs": [
              {
                "name": "query",
                "value": "SELECT * FROM customers WHERE id = ?"
              }
            ],
            "outputs": [
              {
                "kind": "text",
                "text": "Query executed successfully"
              }
            ],
            "is_error": false
          }
        }
      }
    }
  }
}

3. Protocol Message Event

{
  "unmapped": {
    "asop": {
      "step": {
        "operation": {
          "type": "protocol_message",
          "protocol": {
            "type": "A2A",
            "message": {
              "jsonrpc": "2.0",
              "method": "message/stream",
              "id": "stream-123",
              "params": {
                "message": {
                  "messageId": "msg-789",
                  "role": "agent",
                  "kind": "message",
                  "parts": [
                    {
                      "kind": "text",
                      "text": "Streaming response..."
                    }
                  ]
                }
              }
            }
          }
        }
      }
    }
  }
}

Multi-Agent Workflow Example

[
  {
    "category_uid": 6,
    "class_uid": 6003,
    "activity_name": "Agent Request",
    "metadata": {
      "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
      "span_id": "00f067aa0ba902b7"
    },
    "actor": { "user": { "name": "PlannerAgent" }},
    "api": {
      "operation": "task_delegation"
    },
    "unmapped": {
      "agent_context": {
        "agent": {
          "id": "planner-123",
          "name": "PlannerAgent",
          "version": "1.0.0",
          "provider": {
            "name": "ASOP",
            "url": "https://example.asop"
          }
        },
        "session": {
          "id": "collab-789"
        },
        "turn": {
          "id": "turn-456"
        },
        "step": {
          "id": "step-abc",
          "type": "protocolMessage"
        },
        "model": {
          "id": "gpt-4",
          "provider": {
            "name": "OpenAI"
          }
        },
        "reasoning": "Task requires specialized database access"
      }
    }
  },
  {
    "category_uid": 6,
    "class_uid": 6003,
    "activity_name": "Agent Response",
    "metadata": {
      "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
      "span_id": "00f067aa0ba902b8",
      "parent_span_id": "00f067aa0ba902b7"
    },
    "actor": { "user": { "name": "ExecutorAgent" }},
    "api": {
      "operation": "task_acceptance"
    },
    "unmapped": {
      "agent_context": {
        "agent": {
          "id": "executor-456",
          "name": "ExecutorAgent",
          "version": "1.0.0"
        },
        "session": {
          "id": "collab-789"
        },
        "reasoning": "Accepting database query task"
      }
    }
  }
]

Implementation Guidelines

1. Error Handling

  • Always validate events before sending
  • Implement proper exception handling
  • Log validation failures for debugging
  • Include trace context in error reports

2. Performance Considerations

  • Use async validation for high-volume events
  • Implement event batching where appropriate
  • Consider caching validated event templates
  • Monitor SIEM ingestion performance

3. Security Guidelines

  • Sanitize all user inputs before event creation
  • Implement rate limiting for event generation
  • Use secure transport for SIEM communication
  • Regular trace of logged data for sensitive information

4. Schema Versioning

Always specify OCSF schema version in metadata:

{
  "metadata": {
    "ocsf": {
      "version": "1.0.0"
    }
  }
}

5. Trace Context

  • Include trace_id and span_id in metadata for workflow correlation
  • Link related events across agent workflows
  • Use parent_span_id for hierarchical relationships

6. Validation Best Practices

  • Use py-ocsf-models validators for schema compliance
  • Implement custom validation for agent-specific fields
  • Validate events before sending to SIEM
  • Handle validation errors gracefully

Resources