Extract API
The call stage is the call Python function from any registry location.
Getting Started
First, you should start create your call.
RestAPI Integration Examples
This guide demonstrates how to integrate REST APIs into your workflows using call stages and the workflow orchestration system.
Overview
The workflow system provides powerful capabilities for:
- API data extraction: Fetch data from REST endpoints
- Authentication handling: OAuth, Bearer tokens, API keys
- Data transformation: Process and validate API responses
- Error handling: Retry logic and fallback strategies
- Batch processing: Handle large datasets efficiently
Basic API Call
Simple GET Request
Basic API Call
extract-user-data:
type: Workflow
params:
user_id: str
api_key: str
jobs:
get-user:
stages:
- name: "Fetch User Data"
id: fetch-user
uses: api/get-user-data@rest
with:
url: "https://api.example.com/users/${{ params.user_id }}"
headers:
Authorization: "Bearer ${{ params.api_key }}"
Content-Type: "application/json"
from ddeutil.workflow import tag, Result
import requests
@tag("api", alias="get-user-data")
def get_user_data(
url: str,
headers: dict = None,
result: Result = None
) -> dict:
"""Fetch user data from REST API."""
response = requests.get(url, headers=headers or {})
response.raise_for_status()
data = response.json()
result.trace.info(f"Retrieved user: {data.get('name')}")
return {
"user_data": data,
"status_code": response.status_code
}
Authentication Examples
OAuth 2.0 Bearer Token
OAuth Authentication
oauth-api-call:
type: Workflow
params:
client_id: str
client_secret: str
api_endpoint: str
jobs:
authenticate-and-call:
stages:
- name: "Get OAuth Token"
id: get-token
uses: auth/oauth-token@bearer
with:
token_url: "https://api.example.com/oauth/token"
client_id: ${{ params.client_id }}
client_secret: ${{ params.client_secret }}
scope: "read:users"
- name: "Call Protected API"
id: api-call
uses: api/authenticated-call@rest
with:
url: ${{ params.api_endpoint }}
token: ${{ stages.get-token.output.access_token }}
from ddeutil.workflow import tag, Result
import requests
@tag("auth", alias="oauth-token")
def get_oauth_token(
token_url: str,
client_id: str,
client_secret: str,
scope: str = None,
result: Result = None
) -> dict:
"""Get OAuth 2.0 access token."""
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
if scope:
data["scope"] = scope
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
result.trace.info("OAuth token obtained successfully")
return {
"access_token": token_data["access_token"],
"expires_in": token_data.get("expires_in", 3600)
}
@tag("api", alias="authenticated-call")
def authenticated_api_call(
url: str,
token: str,
method: str = "GET",
data: dict = None,
result: Result = None
) -> dict:
"""Make authenticated API call."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.request(
method=method.upper(),
url=url,
headers=headers,
json=data
)
response.raise_for_status()
return {
"data": response.json(),
"status_code": response.status_code,
"headers": dict(response.headers)
}
Advanced API Patterns
Pagination Handling
Paginated API Calls
paginated-extraction:
type: Workflow
params:
base_url: str
api_key: str
page_size: int
jobs:
extract-all-pages:
stages:
- name: "Extract Paginated Data"
id: paginate
uses: api/paginated-extract@rest
with:
base_url: ${{ params.base_url }}
api_key: ${{ params.api_key }}
page_size: ${{ params.page_size }}
max_pages: 10
@tag("api", alias="paginated-extract")
def extract_paginated_data(
base_url: str,
api_key: str,
page_size: int = 100,
max_pages: int = None,
result: Result = None
) -> dict:
"""Extract data from paginated API."""
all_data = []
page = 1
headers = {"Authorization": f"Bearer {api_key}"}
while max_pages is None or page <= max_pages:
url = f"{base_url}?page={page}&size={page_size}"
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
items = data.get("items", [])
if not items:
break
all_data.extend(items)
result.trace.info(f"Extracted page {page}: {len(items)} items")
# Check if there are more pages
if not data.get("has_next", False):
break
page += 1
result.trace.info(f"Total items extracted: {len(all_data)}")
return {
"data": all_data,
"total_pages": page - 1,
"total_items": len(all_data)
}
Error Handling and Retries
Resilient API Calls
import time
from typing import Optional
@tag("api", alias="resilient-call")
def resilient_api_call(
url: str,
headers: dict = None,
max_retries: int = 3,
retry_delay: float = 1.0,
timeout: int = 30,
result: Result = None
) -> dict:
"""Make API call with retry logic and error handling."""
headers = headers or {}
last_exception = None
for attempt in range(max_retries + 1):
try:
result.trace.info(f"API call attempt {attempt + 1}")
response = requests.get(
url,
headers=headers,
timeout=timeout
)
# Handle different HTTP status codes
if response.status_code == 429: # Rate limited
retry_after = int(response.headers.get("Retry-After", retry_delay))
result.trace.warning(f"Rate limited, waiting {retry_after}s")
time.sleep(retry_after)
continue
response.raise_for_status()
result.trace.info("API call successful")
return {
"data": response.json(),
"status_code": response.status_code,
"attempt": attempt + 1
}
except requests.exceptions.RequestException as e:
last_exception = e
result.trace.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries:
result.trace.info(f"Retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
# All retries failed
result.trace.error(f"All {max_retries + 1} attempts failed")
raise last_exception
Data Processing Examples
JSON Data Transformation
API Response Processing
process-api-data:
type: Workflow
params:
api_url: str
target_format: str
jobs:
extract-and-transform:
stages:
- name: "Fetch Raw Data"
id: fetch
uses: api/get-data@rest
with:
url: ${{ params.api_url }}
- name: "Transform Data"
id: transform
uses: data/transform-json@processor
with:
input_data: ${{ stages.fetch.output.data }}
target_format: ${{ params.target_format }}
- name: "Validate Results"
id: validate
uses: data/validate-schema@validator
with:
data: ${{ stages.transform.output.transformed_data }}
from typing import List, Dict, Any
import json
from pydantic import BaseModel, ValidationError
@tag("data", alias="transform-json")
def transform_json_data(
input_data: List[Dict[str, Any]],
target_format: str,
result: Result = None
) -> dict:
"""Transform JSON data to target format."""
transformed_data = []
for item in input_data:
if target_format == "flat":
# Flatten nested objects
flat_item = {}
def flatten(obj, prefix=""):
for key, value in obj.items():
if isinstance(value, dict):
flatten(value, f"{prefix}{key}_")
else:
flat_item[f"{prefix}{key}"] = value
flatten(item)
transformed_data.append(flat_item)
elif target_format == "normalized":
# Normalize field names
normalized_item = {
key.lower().replace(" ", "_"): value
for key, value in item.items()
}
transformed_data.append(normalized_item)
result.trace.info(f"Transformed {len(transformed_data)} records")
return {
"transformed_data": transformed_data,
"original_count": len(input_data),
"transformed_count": len(transformed_data)
}
class UserSchema(BaseModel):
id: int
name: str
email: str
active: bool = True
@tag("data", alias="validate-schema")
def validate_data_schema(
data: List[Dict[str, Any]],
result: Result = None
) -> dict:
"""Validate data against Pydantic schema."""
valid_records = []
invalid_records = []
for i, record in enumerate(data):
try:
user = UserSchema(**record)
valid_records.append(user.model_dump())
except ValidationError as e:
invalid_records.append({
"record_index": i,
"record": record,
"errors": e.errors()
})
result.trace.info(f"Validation complete: {len(valid_records)} valid, {len(invalid_records)} invalid")
if invalid_records:
result.trace.warning(f"Found {len(invalid_records)} invalid records")
return {
"valid_records": valid_records,
"invalid_records": invalid_records,
"validation_success": len(invalid_records) == 0
}
Batch Processing
Parallel API Calls
Parallel Processing
parallel-api-calls:
type: Workflow
params:
user_ids: list[int]
api_base_url: str
jobs:
process-users:
strategy:
matrix:
user_id: ${{ params.user_ids }}
max_workers: 5
stages:
- name: "Process User"
uses: api/process-single-user@batch
with:
user_id: ${{ matrix.user_id }}
base_url: ${{ params.api_base_url }}
@tag("api", alias="process-single-user")
def process_single_user(
user_id: int,
base_url: str,
result: Result = None
) -> dict:
"""Process a single user via API."""
# Get user details
user_url = f"{base_url}/users/{user_id}"
user_response = requests.get(user_url)
user_response.raise_for_status()
user_data = user_response.json()
# Get user's orders
orders_url = f"{base_url}/users/{user_id}/orders"
orders_response = requests.get(orders_url)
orders_response.raise_for_status()
orders_data = orders_response.json()
# Process and combine data
processed_data = {
"user_id": user_id,
"user_name": user_data.get("name"),
"email": user_data.get("email"),
"total_orders": len(orders_data),
"total_spent": sum(order.get("amount", 0) for order in orders_data)
}
result.trace.info(f"Processed user {user_id}: {processed_data['total_orders']} orders")
return processed_data
Configuration and Best Practices
Environment-based Configuration
Configuration Management
# workflow-config.yml
api-workflow:
type: Workflow
params:
environment: str
api_timeout: int
jobs:
api-call:
stages:
- name: "Configure API Call"
uses: api/configured-call@rest
with:
base_url: ${{
params.environment == 'prod'
and 'https://api.prod.example.com'
or 'https://api.dev.example.com'
}}
timeout: ${{ params.api_timeout | coalesce(30) }}
api_key: ${API_KEY} # From environment variable
Error Handling Best Practices
Best Practices
- Always use timeouts to prevent hanging requests
- Implement retry logic with exponential backoff
- Handle rate limiting by respecting
Retry-After
headers - Log meaningful messages for debugging and monitoring
- Validate API responses before processing
- Use environment variables for sensitive data like API keys
- Implement circuit breakers for unreliable APIs
- Cache responses when appropriate to reduce API calls
Security Considerations
Security
- Never hardcode credentials in workflow files
- Use environment variables or secure secret management
- Validate and sanitize all API inputs
- Implement proper authentication for sensitive endpoints
- Log requests/responses carefully to avoid exposing sensitive data
- Use HTTPS for all API communications
- Implement rate limiting to prevent abuse
This comprehensive guide covers the most common API integration patterns in workflow orchestration. Adapt these examples to your specific use cases and API requirements.