Audits
The Audits module provides comprehensive audit capabilities for workflow execution tracking and monitoring. It supports multiple audit backends for capturing execution metadata, status information, and detailed logging.
Overview
The audit system provides:
- Execution tracking: Complete workflow execution metadata
- Release logging: Detailed workflow release information
- Parameter capture: Input parameters and context data
- Result persistence: Execution outcomes and status tracking
- Query capabilities: Search and retrieve audit logs
- Multiple backends: File-based JSON storage and SQLite database storage
Single Backend Limitation
You can set only one audit backend setting for the current run-time because it will conflict audit data if more than one audit backend pointer is set.
Core Components
BaseAudit
Abstract base class for all audit implementations providing core audit functionality.
Key Features
- Workflow execution metadata capture
- Release timestamp tracking
- Context and parameter logging
- Status and result persistence
- Automatic configuration validation
AuditData
Data model for audit information containing workflow execution details.
AuditData Structure
from ddeutil.workflow.audits import AuditData
from datetime import datetime
audit_data = AuditData(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30),
type="scheduled",
context={
"params": {"source_table": "users", "target_env": "prod"},
"jobs": {"extract": {"status": "SUCCESS"}}
},
run_id="workflow-123",
parent_run_id=None,
runs_metadata={"execution_time": "300s", "memory_usage": "512MB"}
)
File-based Audits
FileAudit
File-based audit implementation that persists audit logs to the local filesystem in JSON format.
File Audit Usage
from ddeutil.workflow.audits import FileAudit, AuditData
from datetime import datetime
# Create file audit instance
audit = FileAudit(
type="file",
path="./audits",
extras={"enable_write_audit": True}
)
# Create audit data
data = AuditData(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30),
type="scheduled",
context={
"params": {"source_table": "users", "target_env": "prod"},
"jobs": {"extract": {"status": "SUCCESS"}}
},
run_id="workflow-123",
parent_run_id=None,
runs_metadata={"execution_time": "300s"}
)
# Save audit log
audit.save(data)
# Log is saved to:
# ./audits/workflow=data-pipeline/release=20240115103000/workflow-123.log
Audit File Structure
audits/
├── workflow=data-pipeline/
│ ├── release=20240115103000/
│ │ ├── workflow-123.log
│ │ └── workflow-124.log
│ └── release=20240116080000/
│ └── workflow-125.log
└── workflow=etl-process/
└── release=20240115120000/
└── workflow-126.log
Finding Audits
The FileAudit
class provides utilities to search and retrieve audit logs.
Audit Discovery
from ddeutil.workflow.audits import FileAudit
audit = FileAudit(type="file", path="./audits")
# Find all audits for a workflow
for audit_data in audit.find_audits("data-pipeline"):
print(f"Release: {audit_data.release}")
print(f"Run ID: {audit_data.run_id}")
print(f"Type: {audit_data.type}")
print(f"Context: {audit_data.context}")
# Find specific audit by release
audit_data = audit.find_audit_with_release(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30)
)
# Check if audit exists for specific release
exists = audit.is_pointed(
data=audit_data
)
Audit Log Format
Each audit log is stored as JSON with the following structure:
Audit Log Content
{
"name": "data-pipeline",
"release": "2024-01-15T10:30:00",
"type": "scheduled",
"context": {
"params": {
"source_table": "users",
"target_env": "prod"
},
"jobs": {
"extract": {
"status": "SUCCESS",
"start_time": "2024-01-15T10:30:05",
"end_time": "2024-01-15T10:35:12"
},
"transform": {
"status": "SUCCESS",
"start_time": "2024-01-15T10:35:15",
"end_time": "2024-01-15T10:40:22"
}
}
},
"parent_run_id": null,
"run_id": "workflow-123",
"runs_metadata": {
"execution_time": "300s",
"memory_usage": "512MB",
"cpu_usage": "45%"
}
}
Cleanup Functionality
The FileAudit
class provides cleanup functionality for old audit files:
Audit Cleanup
Database Audits
SQLiteAudit
SQLite-based audit implementation for scalable logging with compression support.
SQLite Audit Usage
from ddeutil.workflow.audits import SQLiteAudit, AuditData
from datetime import datetime
# Create SQLite audit instance
audit = SQLiteAudit(
type="sqlite",
path="./audits/workflow_audits.db",
extras={"enable_write_audit": True}
)
# Create audit data
data = AuditData(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30),
type="scheduled",
context={
"params": {"source_table": "users"},
"jobs": {"extract": {"status": "SUCCESS"}}
},
run_id="workflow-123",
runs_metadata={"execution_time": "300s"}
)
# Save audit log
audit.save(data)
# Traces are stored in SQLite with compression for efficiency
SQLite Schema
The SQLite audit creates a comprehensive table structure:
CREATE TABLE IF NOT EXISTS audits (
workflow TEXT NOT NULL,
release TEXT NOT NULL,
type TEXT NOT NULL,
context BLOB NOT NULL,
parent_run_id TEXT,
run_id TEXT NOT NULL,
metadata BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (workflow, release)
)
Finding SQLite Audits
The SQLiteAudit
class provides utilities to search and retrieve audit logs:
SQLite Audit Discovery
from ddeutil.workflow.audits import SQLiteAudit
audit = SQLiteAudit(type="sqlite", path="./audits/workflow_audits.db")
# Find all audits for a workflow
for audit_data in audit.find_audits("data-pipeline"):
print(f"Release: {audit_data.release}")
print(f"Run ID: {audit_data.run_id}")
print(f"Context: {audit_data.context}")
# Find specific audit by release
audit_data = audit.find_audit_with_release(
name="data-pipeline",
release=datetime(2024, 1, 15, 10, 30)
)
# Find latest audit for a workflow
latest_audit = audit.find_audit_with_release(name="data-pipeline")
Compression Support
SQLite audit uses compression for efficient storage:
Compression Features
SQLite Cleanup
The SQLiteAudit
class provides cleanup functionality for old audit records:
SQLite Audit Cleanup
Audit Data Model
Field Specifications
Field | Type | Required | Description |
---|---|---|---|
name |
str | Yes | Workflow name |
release |
datetime | Yes | Workflow release timestamp |
type |
str | Yes | Execution type (scheduled, manual, event, rerun) |
context |
DictData | No | Execution context including params and job results |
parent_run_id |
str | None | No | Parent workflow run ID for nested executions |
run_id |
str | Yes | Unique execution identifier |
runs_metadata |
DictData | No | Additional metadata for tracking audit logs |
Context Structure
The context
field contains comprehensive execution information:
Context Structure
context = {
"params": {
# Input parameters passed to workflow
"source_table": "users",
"batch_date": "2024-01-15",
"environment": "production"
},
"jobs": {
# Results from each job execution
"job_name": {
"status": "SUCCESS|FAILED|SKIP|CANCEL",
"start_time": "2024-01-15T10:30:00",
"end_time": "2024-01-15T10:35:00",
"stages": {
# Stage-level execution details
"stage_name": {
"status": "SUCCESS",
"output": {"key": "value"}
}
}
}
},
"status": "SUCCESS", # Overall workflow status
"errors": { # Error information if failed
"error_type": "WorkflowError",
"message": "Error description"
}
}
Factory Function
get_audit
Factory function that returns the appropriate audit implementation based on configuration.
Dynamic Audit Creation
Configuration
Audit behavior is controlled by configuration settings:
Setting | Description |
---|---|
audit_conf |
Audit configuration including type and path |
enable_write_audit |
Enable/disable audit logging |
audit_url |
URL/path for audit storage |
Configuration
Integration with Workflows
Audits are automatically created and managed during workflow execution:
Workflow Integration
from ddeutil.workflow import Workflow
from ddeutil.workflow.audits import get_audit
# Load workflow
workflow = Workflow.from_conf("data-pipeline")
# Get audit instance
audit = get_audit(extras={"enable_write_audit": True})
# Execute with audit logging
result = workflow.release(
release=datetime.now(),
params={"source": "users", "target": "warehouse"}
)
# Audit log is automatically created with:
# - Workflow execution metadata
# - Input parameters
# - Job execution results
# - Final status and timing
Use Cases
Compliance Monitoring
Compliance Tracking
from ddeutil.workflow.audits import FileAudit
audit = FileAudit(type="file", path="./audits")
# Query audits for compliance reporting
for audit_data in audit.find_audits("financial-etl"):
if audit_data.release.date() == target_date:
print(f"Execution: {audit_data.run_id}")
print(f"Status: {audit_data.context.get('status')}")
print(f"Parameters: {audit_data.context.get('params')}")
Failure Analysis
Error Investigation
from ddeutil.workflow.audits import SQLiteAudit
audit = SQLiteAudit(type="sqlite", path="./audits/workflow_audits.db")
# Find failed workflow executions
for audit_data in audit.find_audits("data-pipeline"):
if audit_data.context.get("status") == "FAILED":
print(f"Failed run: {audit_data.run_id}")
print(f"Error: {audit_data.context.get('errors')}")
print(f"Failed jobs: {[j for j, data in audit_data.context['jobs'].items()
if data['status'] == 'FAILED']}")
Performance Monitoring
Performance Analysis
from ddeutil.workflow.audits import FileAudit
from datetime import datetime
audit = FileAudit(type="file", path="./audits")
# Analyze workflow performance trends
execution_times = []
for audit_data in audit.find_audits("etl-workflow"):
start = audit_data.release
# Calculate duration from metadata or context
duration = audit_data.runs_metadata.get("execution_time", "0s")
execution_times.append(duration)
print(f"Total executions: {len(execution_times)}")
Best Practices
- Enable auditing in production for compliance and monitoring
- Configure appropriate retention policies for audit log cleanup
- Use SQLite for high-volume deployments with compression benefits
- Use file-based audit for simple deployments with easy file access
- Regular audit log analysis helps identify patterns and optimization opportunities
- Monitor audit storage usage and implement cleanup schedules