workflow_management
Manage LangGraph workflows, debug state transitions, and handle checkpoints
Install
mkdir -p .claude/skills/workflow-management && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/14187" && unzip -o skill.zip -d .claude/skills/workflow-management && rm skill.zipInstalls to .claude/skills/workflow-management
Activation
This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.
Manage LangGraph workflows, debug state transitions, and handle checkpointsAbout this skill
LangGraph Workflow Management Skill
Purpose
Master the AARLP recruitment workflow powered by LangGraph. This skill covers workflow debugging, state management, checkpoint handling, and adding new nodes/edges to the graph.
AI Provider Selection (Amazon Nova AI Hackathon)
AARLP supports two AI providers:
bedrock(default for hackathon) - AWS Nova modelsopenai(fallback) - OpenAI GPT-4
Set via AI_PROVIDER environment variable.
Secrets Management
Required for External API Calls:
| Secret | Provider | Purpose |
|---|---|---|
AWS_ACCESS_KEY_ID | bedrock | Nova model access |
AWS_SECRET_ACCESS_KEY | bedrock | Nova model access |
OPENAI_API_KEY | openai | JD generation fallback |
PINECONE_API_KEY | both | Shortlisting nodes |
Configuration (app/core/config.py):
class Settings(BaseSettings):
# AI Provider Selection
ai_provider: Literal["openai", "bedrock"] = "bedrock"
# AWS Bedrock (Primary for hackathon)
aws_access_key_id: str
aws_secret_access_key: str
aws_region: str = "us-east-1"
bedrock_model_id: str = "amazon.nova-lite-v1:0"
# OpenAI (Fallback)
openai_api_key: str
openai_model: str = "gpt-4o"
class Config:
env_file = ".env"
Security in Workflow Nodes:
async def api_integration_node(state: GraphState) -> GraphState:
"""Use provider-agnostic client for AI calls."""
from app.ai.client import is_bedrock_provider
from app.core.config import get_settings
if is_bedrock_provider():
from app.ai.bedrock_client import invoke_nova_model
result = await invoke_nova_model(messages=[...])
else:
from app.ai.client import get_openai_client
client = get_openai_client()
response = await client.chat.completions.create(...)
# NEVER log API keys
logger.info(f"AI call completed for job {state.job_id}")
return state.model_copy(...)
Architecture Overview
Workflow Files Structure
app/workflow/
├── state.py # Pydantic GraphState models
├── nodes.py # Node function implementations
├── edges.py # Conditional routing logic
├── builder.py # Graph construction
├── engine.py # High-level workflow API
├── checkpoints.py # State persistence
├── constants.py # NodeName enum, constants
├── helpers.py # Shared utilities
└── exceptions.py # Workflow errors
Core Concepts
1. State Management (Pydantic-Based)
The workflow uses immutable Pydantic models for type-safe state:
from app.workflow.state import GraphState
# State is always accessed through Pydantic models
state = GraphState(
job_id="123",
current_node="generate_jd",
jd=JobDescriptionState(
title="Senior Engineer",
status=Status.PENDING,
approval_status=ApprovalStatus.PENDING
)
)
# Type-safe access with autocomplete
print(state.jd.approval_status) # ✅ IDE knows this is ApprovalStatus
Key Benefits:
- ✅ Runtime validation on every state update
- ✅ No manual dict-to-object conversions
- ✅ Full IDE support with autocomplete
- ✅ Nested state objects for organization
2. Checkpointing (Pause/Resume)
AARLP uses checkpoints to persist workflow state:
from app.workflow.engine import WorkflowEngine
from app.workflow.checkpoints import save_checkpoint, load_checkpoint
# Initialize with checkpointing enabled
# IMPORTANT: Use consistent thread_id format: f"job-{job_id}"
engine = WorkflowEngine(
job_id="job-123",
thread_id="job-123", # Standard format: job-{job_id}
enable_checkpointing=True
)
# Run until human-in-the-loop wait state
result = await engine.run_until_interrupt()
# Later, resume from checkpoint
resumed_state = await engine.resume_from_checkpoint(
updates={"jd": {"approval_status": "APPROVED"}}
)
Checkpoint Use Cases:
- JD approval waiting
- Shortlist approval waiting
- Voice interview scheduling
- Any human-in-the-loop decision point
3. Node Development
Node Function Signature
from app.workflow.state import GraphState
async def my_node(state: GraphState) -> GraphState:
"""
All nodes must:
1. Accept GraphState as input
2. Return modified GraphState
3. Be async (for DB/API calls)
4. Handle errors with custom exceptions
"""
# Extract what you need (type-safe)
job_id = state.job_id
# Do work
result = await some_async_operation(job_id)
# Return updated state (Pydantic handles validation)
return state.model_copy(
update={
"current_node": "my_node",
"updated_at": datetime.now(timezone.utc)
}
)
Adding a New Node
- Define the node function in
nodes.py:
async def my_new_node(state: GraphState) -> GraphState:
"""Description of what this node does."""
try:
# Your logic here
logger.info(f"Processing {state.job_id} in my_new_node")
# Update state
return state.model_copy(
update={"current_node": NodeName.MY_NEW_NODE}
)
except Exception as e:
logger.error(f"Error in my_new_node: {e}")
raise GraphExecutionError(f"Failed at my_new_node: {e}")
- Add node name to constants in
constants.py:
class NodeName(str, Enum):
GENERATE_JD = "generate_jd"
MY_NEW_NODE = "my_new_node" # Add this
- Register in builder in
builder.py:
from app.workflow.nodes import my_new_node
graph.add_node(NodeName.MY_NEW_NODE, my_new_node)
- Add edges (routing):
# Simple edge
graph.add_edge(NodeName.GENERATE_JD, NodeName.MY_NEW_NODE)
# Conditional edge
graph.add_conditional_edges(
NodeName.MY_NEW_NODE,
my_routing_function, # Defined in edges.py
{
"success": NodeName.NEXT_NODE,
"failure": NodeName.ERROR_NODE
}
)
4. Conditional Routing (Edges)
Edges determine the next node based on state:
# In edges.py
def my_routing_function(state: GraphState) -> str:
"""
Return a string key that maps to a node in the edge map.
"""
if state.jd.approval_status == ApprovalStatus.APPROVED:
return "approved"
elif state.jd.approval_status == ApprovalStatus.REJECTED:
return "rejected"
else:
return "waiting"
# In builder.py
graph.add_conditional_edges(
NodeName.WAIT_JD_APPROVAL,
my_routing_function,
{
"approved": NodeName.POST_JOB,
"rejected": NodeName.GENERATE_JD, # Regenerate
"waiting": NodeName.WAIT_JD_APPROVAL # Stay in wait state
}
)
Common Workflow Patterns
Pattern 1: Human-in-the-Loop
async def wait_for_approval_node(state: GraphState) -> GraphState:
"""
Node that interrupts execution until human approval.
Uses special interrupt pattern.
"""
from langgraph.types import interrupt
# Send interrupt signal with current state
interrupt(
value={
"message": "Waiting for JD approval",
"job_id": state.job_id,
"jd": state.jd.model_dump()
}
)
# This line only executes after resume
return state.model_copy(
update={"current_node": NodeName.WAIT_JD_APPROVAL}
)
Pattern 2: Error Handling
async def robust_node(state: GraphState) -> GraphState:
"""Node with comprehensive error handling."""
try:
result = await risky_operation()
return state.model_copy(
update={"current_node": NodeName.ROBUST_NODE}
)
except SpecificError as e:
# Log and store error in state
logger.error(f"Specific error: {e}")
return state.model_copy(
update={
"error_message": str(e),
"current_node": NodeName.ERROR_HANDLER
}
)
except Exception as e:
# Unexpected errors raise custom exception
logger.critical(f"Unexpected error: {e}")
raise GraphExecutionError(f"Failed: {e}")
Pattern 3: External API Integration
async def api_integration_node(state: GraphState) -> GraphState:
"""Call external APIs within workflow."""
from app.ai.client import get_openai_client
client = get_openai_client()
# Make async API call
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "..."}]
)
result = response.choices[0].message.content
# Store result in state
return state.model_copy(
update={
"jd": state.jd.model_copy(update={"content": result})
}
)
Debugging Workflows
1. Enable Debug Logging
# In config
import logging
logging.getLogger("app.workflow").setLevel(logging.DEBUG)
2. Inspect State at Each Node
# Add to node for debugging
logger.debug(f"State at {NodeName.MY_NODE}: {state.model_dump_json(indent=2)}")
3. Test Individual Nodes
# In tests/workflow/test_nodes.py
import pytest
from app.workflow.nodes import my_node
from app.workflow.state import GraphState
@pytest.mark.asyncio
async def test_my_node():
# Arrange
initial_state = GraphState(
job_id="test-123",
current_node="start",
jd=JobDescriptionState(status=Status.PENDING)
)
# Act
result = await my_node(initial_state)
# Assert
assert result.current_node == NodeName.MY_NODE
assert result.jd.status == Status.COMPLETED
4. Validate State Transitions
# Test edge logic
from app.workflow.edges import my_routing_function
def test_routing_approved():
state = GraphState(
job_id="123",
jd=JobDescriptionState(approval_status=ApprovalStatus.APPROVED)
)
next_node = my_routing_function(state)
assert
---
*Content truncated.*