Skip to content

@workflow Decorator

@workflow Decorator

The @workflow decorator is Klira AI SDK’s primary decorator for instrumenting high-level processes and business workflows. It provides automatic framework detection, distributed tracing, and hierarchical context management for complex multi-step operations.

Overview

A workflow represents a high-level business process that orchestrates multiple tasks, agents, and tools to achieve a specific outcome. The @workflow decorator automatically adapts to your framework (OpenAI Agents, LangChain, CrewAI, etc.) while providing consistent observability and governance.

Basic Usage

import os
from klira.sdk import Klira
from klira.sdk.decorators import workflow
from klira.sdk.utils.context import set_hierarchy_context
# Initialize Klira AI
klira = Klira.init(
app_name="WorkflowApp",
api_key=os.getenv("KLIRA_API_KEY"),
enabled=True
)
# Set user context
set_hierarchy_context(user_id="user_123")
@workflow(
name="data_processing_workflow",
organization_id="acme_corp",
project_id="analytics_platform"
)
def process_customer_data(customer_id: str, data_source: str) -> dict:
"""Process customer data through multiple stages."""
# Extract data
raw_data = extract_data(customer_id, data_source)
# Transform data
cleaned_data = transform_data(raw_data)
# Load into analytics platform
result = load_data(cleaned_data)
return {
"customer_id": customer_id,
"records_processed": len(cleaned_data),
"status": "completed"
}

Parameters Reference

Core Parameters

ParameterTypeRequiredDescription
namestrNoWorkflow name (defaults to function name)
versionintNoWorkflow version for tracking changes
organization_idstrNoOrganization identifier for hierarchical context
project_idstrNoProject identifier for grouping workflows

Advanced Parameters

ParameterTypeDescription
**kwargsAnyFramework-specific parameters passed to underlying adapters

Framework-Specific Behavior

OpenAI Agents Integration

When used with OpenAI Agents, @workflow integrates with the native workflow system:

from agents import Agent, Runner
from klira.sdk.decorators import workflow
from klira.sdk.utils.context import set_hierarchy_context
# Set user context
set_hierarchy_context(user_id="user_123")
@workflow(
name="customer_support_workflow",
organization_id="support_team",
project_id="helpdesk_v2"
)
def create_support_workflow(customer_query: str) -> str:
"""Create a customer support workflow with multiple agents."""
# Create specialized agents
triage_agent = Agent(
name="TriageAgent",
instructions="Classify and route customer queries",
model="gpt-4"
)
resolution_agent = Agent(
name="ResolutionAgent",
instructions="Provide detailed solutions to customer problems",
model="gpt-4"
)
# Run workflow
runner = Runner()
# Step 1: Triage the query
triage_result = runner.run(
agent=triage_agent,
messages=[{"role": "user", "content": f"Classify this query: {customer_query}"}]
)
# Step 2: Generate resolution
resolution = runner.run(
agent=resolution_agent,
messages=[
{"role": "user", "content": customer_query},
{"role": "assistant", "content": triage_result.messages[-1]["content"]}
]
)
return resolution.messages[-1]["content"]

LangChain Integration

With LangChain, @workflow works with chains and agent executors:

from langchain.chains import LLMChain
from langchain.agents import AgentExecutor
from langchain_openai import ChatOpenAI
from klira.sdk.decorators import workflow
from klira.sdk.utils.context import set_hierarchy_context
# Set user context
set_hierarchy_context(user_id="user_123")
@workflow(
name="research_workflow",
organization_id="research_team",
project_id="market_analysis"
)
def research_market_trends(topic: str, depth: str = "comprehensive") -> dict:
"""Research market trends using LangChain workflow."""
llm = ChatOpenAI(temperature=0)
# Research chain
research_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["topic", "depth"],
template="Research {topic} with {depth} analysis. Provide structured insights."
)
)
# Analysis chain
analysis_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["research_data"],
template="Analyze this research data and provide actionable insights: {research_data}"
)
)
# Execute workflow
research_data = research_chain.run(topic=topic, depth=depth)
analysis = analysis_chain.run(research_data=research_data)
return {
"topic": topic,
"research_data": research_data,
"analysis": analysis,
"timestamp": datetime.now().isoformat()
}

CrewAI Integration

For CrewAI, @workflow orchestrates crew creation and execution:

from crewai import Agent, Task, Crew, Process
from klira.sdk.decorators import workflow
@workflow(
name="content_creation_workflow",
organization_id="marketing_team",
project_id="content_strategy"
)
def create_marketing_content(topic: str, target_audience: str) -> str:
"""Create marketing content using CrewAI workflow."""
# Define agents
researcher = Agent(
role="Content Researcher",
goal="Research comprehensive information about the topic",
backstory="Expert researcher with deep knowledge of market trends",
verbose=True
)
writer = Agent(
role="Content Writer",
goal="Create engaging content based on research",
backstory="Skilled writer who creates compelling marketing content",
verbose=True
)
# Define tasks
research_task = Task(
description=f"Research {topic} for {target_audience} audience",
agent=researcher,
expected_output="Comprehensive research report with key insights"
)
writing_task = Task(
description=f"Write marketing content about {topic} for {target_audience}",
agent=writer,
expected_output="Engaging marketing content ready for publication"
)
# Create and run crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
return str(result)

Async Workflow Support

The @workflow decorator supports both synchronous and asynchronous workflows:

import asyncio
from klira.sdk.decorators import workflow
@workflow(
name="async_data_pipeline",
organization_id="data_team",
project_id="real_time_analytics"
)
async def process_streaming_data(stream_id: str, batch_size: int = 100) -> dict:
"""Process streaming data asynchronously."""
processed_count = 0
error_count = 0
async def process_batch(batch_data):
nonlocal processed_count, error_count
try:
# Simulate async processing
await asyncio.sleep(0.1)
processed_count += len(batch_data)
return True
except Exception:
error_count += 1
return False
# Simulate streaming data processing
for i in range(0, 1000, batch_size):
batch_data = [f"record_{j}" for j in range(i, min(i + batch_size, 1000))]
await process_batch(batch_data)
return {
"stream_id": stream_id,
"processed_count": processed_count,
"error_count": error_count,
"status": "completed"
}
# Usage
async def main():
result = await process_streaming_data("stream_001", batch_size=50)
print(f"Processed {result['processed_count']} records")
# asyncio.run(main())

Error Handling and Resilience

The @workflow decorator includes built-in error handling and recovery mechanisms:

from klira.sdk.decorators import workflow
from klira.sdk.utils.error_handler import handle_errors
@workflow(
name="resilient_workflow",
organization_id="production_team",
project_id="critical_systems"
)
@handle_errors(fail_closed=False, max_retries=3)
def resilient_data_processing(data_source: str) -> dict:
"""Workflow with built-in error handling and retries."""
try:
# Step 1: Validate input
if not data_source or len(data_source) < 3:
raise ValueError("Invalid data source provided")
# Step 2: Process data with potential failures
results = []
for i in range(5):
try:
# Simulate processing that might fail
if i == 2: # Simulate a failure
raise ConnectionError("Temporary connection issue")
results.append(f"processed_item_{i}")
except ConnectionError as e:
logger.warning(f"Retrying after connection error: {e}")
# Built-in retry mechanism will handle this
raise
return {
"status": "success",
"processed_items": results,
"data_source": data_source
}
except Exception as e:
# Workflow-level error handling
logger.error(f"Workflow failed: {e}")
return {
"status": "failed",
"error": str(e),
"data_source": data_source
}

Hierarchical Context Management

Workflows automatically establish hierarchical context for nested operations:

from klira.sdk.decorators import workflow, task, agent
@workflow(
name="hierarchical_workflow",
organization_id="enterprise_corp",
project_id="customer_onboarding"
)
def onboard_customer(customer_data: dict) -> dict:
"""Multi-step customer onboarding workflow."""
# Context is automatically propagated to nested decorators
validation_result = validate_customer_data(customer_data)
if validation_result["valid"]:
account_result = create_customer_account(customer_data)
notification_result = send_welcome_notification(customer_data["email"])
return {
"status": "completed",
"customer_id": account_result["customer_id"],
"validation": validation_result,
"account": account_result,
"notification": notification_result
}
else:
return {
"status": "failed",
"validation": validation_result
}
@task(name="validate_customer_data") # Inherits workflow context
def validate_customer_data(data: dict) -> dict:
"""Validate customer data."""
required_fields = ["name", "email", "phone"]
missing_fields = [field for field in required_fields if field not in data]
return {
"valid": len(missing_fields) == 0,
"missing_fields": missing_fields
}
@agent(name="account_creation_agent") # Inherits workflow context
def create_customer_account(data: dict) -> dict:
"""Create customer account."""
import uuid
return {
"customer_id": str(uuid.uuid4()),
"account_status": "active",
"created_at": datetime.now().isoformat()
}
@task(name="notification_task") # Inherits workflow context
def send_welcome_notification(email: str) -> dict:
"""Send welcome notification."""
# Simulate email sending
return {
"email": email,
"notification_sent": True,
"sent_at": datetime.now().isoformat()
}

Performance Monitoring

Workflows include comprehensive performance monitoring:

import time
from klira.sdk.decorators import workflow
from klira.sdk.tracing import create_span, set_span_attribute
@workflow(
name="performance_monitored_workflow",
organization_id="performance_team",
project_id="optimization_analysis"
)
def performance_critical_workflow(dataset_size: int) -> dict:
"""Workflow with detailed performance monitoring."""
start_time = time.time()
# Custom performance tracking
with create_span("data_loading") as span:
set_span_attribute(span, "dataset.size", dataset_size)
# Simulate data loading
load_time = min(dataset_size / 1000, 5.0) # Cap at 5 seconds
time.sleep(load_time)
set_span_attribute(span, "data_loading.duration_seconds", load_time)
# Processing phase
with create_span("data_processing") as span:
set_span_attribute(span, "processing.algorithm", "optimized_batch")
# Simulate processing
process_time = min(dataset_size / 2000, 3.0) # Cap at 3 seconds
time.sleep(process_time)
set_span_attribute(span, "data_processing.duration_seconds", process_time)
set_span_attribute(span, "processing.throughput_records_per_second", dataset_size / process_time)
total_time = time.time() - start_time
return {
"dataset_size": dataset_size,
"total_duration_seconds": total_time,
"load_duration_seconds": load_time,
"process_duration_seconds": process_time,
"throughput_records_per_second": dataset_size / total_time,
"status": "completed"
}

Multi-Framework Workflows

Workflows can orchestrate multiple frameworks within a single process:

from klira.sdk.decorators import workflow
from klira.sdk.utils.framework_detection import detect_framework
@workflow(
name="multi_framework_workflow",
organization_id="integration_team",
project_id="hybrid_ai_system"
)
def hybrid_ai_workflow(user_query: str) -> dict:
"""Workflow that uses multiple AI frameworks."""
results = {}
# Use LangChain for initial processing
try:
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
langchain_llm = ChatOpenAI(temperature=0)
langchain_result = langchain_llm.invoke([{"role": "user", "content": f"Analyze: {user_query}"}])
results["langchain_analysis"] = langchain_result.content
except ImportError:
results["langchain_analysis"] = "LangChain not available"
# Use OpenAI Agents for specialized processing
try:
from agents import Agent, Runner
specialist_agent = Agent(
name="SpecialistAgent",
instructions="Provide specialized insights based on the analysis",
model="gpt-4"
)
runner = Runner()
specialist_result = runner.run(
agent=specialist_agent,
messages=[{"role": "user", "content": f"Based on this analysis: {results.get('langchain_analysis', '')}, provide specialized insights for: {user_query}"}]
)
results["specialist_insights"] = specialist_result.messages[-1]["content"]
except ImportError:
results["specialist_insights"] = "OpenAI Agents not available"
# Use CrewAI for collaborative processing
try:
from crewai import Agent, Task, Crew, Process
reviewer = Agent(
role="Quality Reviewer",
goal="Review and synthesize all analyses",
backstory="Expert reviewer who synthesizes multiple AI perspectives"
)
review_task = Task(
description=f"Review and synthesize these analyses: {results}",
agent=reviewer,
expected_output="Comprehensive synthesis of all analyses"
)
crew = Crew(
agents=[reviewer],
tasks=[review_task],
process=Process.sequential
)
synthesis = crew.kickoff()
results["final_synthesis"] = str(synthesis)
except ImportError:
results["final_synthesis"] = "CrewAI not available"
# Detect which frameworks were actually used
detected_framework = detect_framework()
results["detected_framework"] = detected_framework
results["query"] = user_query
return results

Testing Workflows

Here’s how to test workflow-decorated functions:

import pytest
from unittest.mock import Mock, patch
from klira.sdk import Klira
from klira.sdk.decorators import workflow
# Test setup
@pytest.fixture
def klira_setup():
"""Setup Klira AI for testing."""
return Klira.init(app_name="WorkflowTest", enabled=True)
def test_basic_workflow(klira_setup):
"""Test basic workflow functionality."""
@workflow(
name="test_workflow",
organization_id="test_org",
project_id="test_project"
)
def simple_workflow(input_data: str) -> dict:
return {
"processed": input_data.upper(),
"status": "success"
}
# Test execution
result = simple_workflow("hello world")
assert result["processed"] == "HELLO WORLD"
assert result["status"] == "success"
assert hasattr(simple_workflow, "__name__")
def test_async_workflow(klira_setup):
"""Test async workflow functionality."""
@workflow(
name="async_test_workflow",
organization_id="test_org",
project_id="test_project"
)
async def async_workflow(delay: float) -> dict:
await asyncio.sleep(delay)
return {
"delay": delay,
"status": "completed"
}
# Test async execution
async def run_test():
result = await async_workflow(0.1)
assert result["delay"] == 0.1
assert result["status"] == "completed"
asyncio.run(run_test())
def test_workflow_error_handling(klira_setup):
"""Test workflow error handling."""
@workflow(
name="error_test_workflow",
organization_id="test_org",
project_id="test_project"
)
def error_workflow(should_fail: bool) -> dict:
if should_fail:
raise ValueError("Intentional test error")
return {"status": "success"}
# Test successful execution
result = error_workflow(False)
assert result["status"] == "success"
# Test error handling
with pytest.raises(ValueError):
error_workflow(True)
@patch('klira.sdk.utils.framework_detection.detect_framework')
def test_workflow_framework_detection(mock_detect, klira_setup):
"""Test workflow framework detection."""
mock_detect.return_value = "langchain"
@workflow(
name="framework_test_workflow",
organization_id="test_org",
project_id="test_project"
)
def framework_workflow() -> str:
from klira.sdk.utils.framework_detection import detect_framework
return detect_framework()
result = framework_workflow()
assert result == "langchain"
mock_detect.assert_called()

Best Practices

1. Naming Conventions

# Good: Descriptive workflow names
@workflow(name="customer_onboarding_workflow")
@workflow(name="data_processing_pipeline")
@workflow(name="ai_content_generation_workflow")
# Avoid: Generic or unclear names
@workflow(name="workflow1")
@workflow(name="process")
@workflow(name="main")

2. Hierarchical Organization

# Use consistent organization and project IDs
@workflow(
name="user_management_workflow",
organization_id="acme_corp", # Company/org level
project_id="customer_portal_v2" # Project level
)

3. Error Handling

# Always include comprehensive error handling
@workflow(name="robust_workflow")
def robust_workflow(data: dict) -> dict:
try:
# Validate inputs
if not data:
raise ValueError("No data provided")
# Process with error recovery
result = process_data(data)
return {"status": "success", "result": result}
except Exception as e:
logger.error(f"Workflow failed: {e}")
return {"status": "error", "error": str(e)}

4. Performance Considerations

# Use spans for performance tracking
@workflow(name="performance_aware_workflow")
def performance_workflow(large_dataset: list) -> dict:
with create_span("data_validation") as span:
set_span_attribute(span, "dataset.size", len(large_dataset))
# Validation logic here
with create_span("data_processing") as span:
# Processing logic here
pass
return {"processed": len(large_dataset)}

5. Version Management

# Use versions for tracking workflow changes
@workflow(
name="evolving_workflow",
version=2, # Increment when making breaking changes
organization_id="acme_corp",
project_id="production_system"
)
def evolving_workflow_v2(data: dict) -> dict:
# New implementation
pass

Common Patterns

1. Data Pipeline Workflow

@workflow(name="etl_pipeline")
def etl_pipeline(source: str, destination: str) -> dict:
"""Extract, Transform, Load pipeline."""
# Extract
with create_span("extract") as span:
data = extract_data(source)
set_span_attribute(span, "records.extracted", len(data))
# Transform
with create_span("transform") as span:
transformed_data = transform_data(data)
set_span_attribute(span, "records.transformed", len(transformed_data))
# Load
with create_span("load") as span:
load_result = load_data(transformed_data, destination)
set_span_attribute(span, "records.loaded", load_result["count"])
return {
"source": source,
"destination": destination,
"records_processed": len(transformed_data),
"status": "completed"
}

2. AI Agent Orchestration

@workflow(name="multi_agent_workflow")
def multi_agent_workflow(task_description: str) -> dict:
"""Orchestrate multiple AI agents."""
results = {}
# Research phase
research_result = research_agent(task_description)
results["research"] = research_result
# Analysis phase
analysis_result = analysis_agent(research_result)
results["analysis"] = analysis_result
# Synthesis phase
synthesis_result = synthesis_agent(results)
results["synthesis"] = synthesis_result
return results

3. Event-Driven Workflow

@workflow(name="event_driven_workflow")
async def event_driven_workflow(event_data: dict) -> dict:
"""Process events asynchronously."""
event_type = event_data.get("type")
if event_type == "user_signup":
return await handle_user_signup(event_data)
elif event_type == "order_placed":
return await handle_order_placed(event_data)
elif event_type == "payment_received":
return await handle_payment_received(event_data)
else:
return {"status": "unknown_event", "event_type": event_type}

Troubleshooting

Common Issues

  1. Framework Not Detected
# Issue: Framework detection fails
# Solution: Ensure framework is properly imported before decoration
# Before decorator application
import langchain # or your framework
from klira.sdk.decorators import workflow
@workflow(name="my_workflow")
def my_workflow():
pass
  1. Context Not Propagated
# Issue: Nested functions don't inherit context
# Solution: Ensure nested functions are also decorated
@workflow(name="parent_workflow")
def parent_workflow():
return nested_operation() # This should also be decorated
@task(name="nested_operation") # Add appropriate decorator
def nested_operation():
pass
  1. Performance Issues
# Issue: Workflow is slow
# Solution: Use spans to identify bottlenecks
@workflow(name="optimized_workflow")
def optimized_workflow():
with create_span("expensive_operation") as span:
# Monitor this operation specifically
result = expensive_operation()
set_span_attribute(span, "operation.duration", time.time() - start)
return result

The @workflow decorator is the foundation of Klira AI SDK’s observability system. It provides automatic framework adaptation, comprehensive tracing, and hierarchical context management that scales from simple functions to complex multi-framework AI systems.