Audits
The Audits module provides metadata audit logging for workflow execution, tracking workflow releases, parameters, and execution results for compliance and monitoring.
Overview
The audit system provides:
- Execution tracking: Complete workflow execution metadata
- Release logging: Detailed workflow release information
- Parameter capture: Input parameters and context data
- Result persistence: Execution outcomes and status tracking
- Query capabilities: Search and retrieve audit logs
Base Classes
BaseAudit
Abstract base class for all audit implementations providing core audit functionality.
Key Features
- Workflow execution metadata capture
- Release timestamp tracking
- Context and parameter logging
- Status and result persistence
File-based Audits
FileAudit
File-based audit implementation that persists audit logs to the local filesystem in JSON format.
File Audit Usage
from ddeutil.workflow.audits import FileAudit
from datetime import datetime
# Create audit log
audit = FileAudit(
name="data-pipeline",
type="scheduled",
release=datetime(2024, 1, 15, 10, 30),
context={
"params": {"source_table": "users", "target_env": "prod"},
"jobs": {"extract": {"status": "SUCCESS"}}
},
run_id="workflow-123",
parent_run_id=None,
update=datetime.now()
)
# Save audit log
audit.save()
# Log is saved to:
# {audit_url.path}/workflow=data-pipeline/release=20240115103000/workflow-123.log
Audit File Structure
audits/
├── workflow=data-pipeline/
│ ├── release=20240115103000/
│ │ ├── workflow-123.log
│ │ └── workflow-124.log
│ └── release=20240116080000/
│ └── workflow-125.log
└── workflow=etl-process/
└── release=20240115120000/
└── workflow-126.log
Finding Audits
The FileAudit
class provides utilities to search and retrieve audit logs.
Audit Discovery
from ddeutil.workflow.audits import FileAudit
# Find all audits for a workflow
for audit in FileAudit.find_audits("data-pipeline"):
print(f"Release: {audit.release}")
print(f"Run ID: {audit.run_id}")
print(f"Status: {audit.context.get('status')}")
print(f"Parameters: {audit.context.get('params')}")
# Check if audit exists for specific release
exists = FileAudit.is_pointed(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30)
)
Audit Log Format
Each audit log is stored as JSON with the following structure:
Audit Log Content
{
"name": "data-pipeline",
"type": "scheduled",
"release": "2024-01-15T10:30:00",
"context": {
"params": {
"source_table": "users",
"target_env": "prod"
},
"jobs": {
"extract": {
"status": "SUCCESS",
"start_time": "2024-01-15T10:30:05",
"end_time": "2024-01-15T10:35:12"
},
"transform": {
"status": "SUCCESS",
"start_time": "2024-01-15T10:35:15",
"end_time": "2024-01-15T10:40:22"
}
}
},
"run_id": "workflow-123",
"parent_run_id": null,
"update": "2024-01-15T10:40:25"
}
Audit Data Model
Field Specifications
Field | Type | Required | Description |
---|---|---|---|
name |
str | Yes | Workflow name |
type |
str | Yes | Execution type (scheduled, manual, event, rerun) |
release |
datetime | Yes | Workflow release timestamp |
context |
dict | Yes | Execution context including params and job results |
run_id |
str | Yes | Unique execution identifier |
parent_run_id |
str | None | No | Parent workflow run ID for nested executions |
update |
datetime | Yes | Audit record update timestamp |
Context Structure
The context
field contains comprehensive execution information:
Context Structure
context = {
"params": {
# Input parameters passed to workflow
"source_table": "users",
"batch_date": "2024-01-15",
"environment": "production"
},
"jobs": {
# Results from each job execution
"job_name": {
"status": "SUCCESS|FAILED|SKIP|CANCEL",
"start_time": "2024-01-15T10:30:00",
"end_time": "2024-01-15T10:35:00",
"stages": {
# Stage-level execution details
"stage_name": {
"status": "SUCCESS",
"output": {"key": "value"}
}
}
}
},
"status": "SUCCESS", # Overall workflow status
"errors": { # Error information if failed
"error_type": "WorkflowError",
"message": "Error description"
}
}
Audit Factory
get_audit_model
Factory function that returns the appropriate audit implementation based on configuration.
Dynamic Audit Creation
from ddeutil.workflow.audits import get_audit_model
# Automatically selects appropriate audit implementation
audit = get_audit_model(
name="data-pipeline",
type="scheduled",
release=datetime.now(),
run_id="workflow-123"
)
# Add execution context
audit.context = {
"params": {"env": "prod"},
"jobs": {"job1": {"status": "SUCCESS"}}
}
# Save audit log
audit.save()
Integration with Workflows
Audits are automatically created and managed during workflow execution:
Workflow Integration
from ddeutil.workflow import Workflow
# Load workflow
workflow = Workflow.from_conf("data-pipeline")
# Execute with audit logging
result = workflow.release(
release=datetime.now(),
params={"source": "users", "target": "warehouse"},
audit=FileAudit # Specify audit implementation
)
# Audit log is automatically created with:
# - Workflow execution metadata
# - Input parameters
# - Job execution results
# - Final status and timing
Configuration
Audit behavior is controlled by environment variables:
Variable | Default | Description |
---|---|---|
WORKFLOW_CORE_AUDIT_URL |
./logs/audits |
Path for audit file storage |
WORKFLOW_CORE_ENABLE_WRITE_AUDIT |
false |
Enable/disable audit logging |
WORKFLOW_CORE_AUDIT_EXCLUDED |
[] |
Fields to exclude from audit logs |
Configuration
Use Cases
Compliance Monitoring
Compliance Tracking
Failure Analysis
Error Investigation
# Find failed workflow executions
for audit in FileAudit.find_audits("data-pipeline"):
if audit.context.get("status") == "FAILED":
print(f"Failed run: {audit.run_id}")
print(f"Error: {audit.context.get('errors')}")
print(f"Failed jobs: {[j for j, data in audit.context['jobs'].items()
if data['status'] == 'FAILED']}")
Performance Monitoring
Performance Analysis
# Analyze workflow performance trends
execution_times = []
for audit in FileAudit.find_audits("etl-workflow"):
start = audit.release
end = audit.update
duration = (end - start).total_seconds()
execution_times.append(duration)
avg_duration = sum(execution_times) / len(execution_times)
print(f"Average execution time: {avg_duration:.2f} seconds")
Best Practices
- Enable auditing in production for compliance and monitoring
- Configure appropriate retention policies for audit log cleanup
- Exclude sensitive data from audit logs using configuration
- Use audit data for alerting on workflow failures or performance degradation
- Regular audit log analysis helps identify patterns and optimization opportunities