agentskills.codes
WO

workflow_management

Manage LangGraph workflows, debug state transitions, and handle checkpoints

Install

mkdir -p .claude/skills/workflow-management && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/14187" && unzip -o skill.zip -d .claude/skills/workflow-management && rm skill.zip

Installs to .claude/skills/workflow-management

Activation

This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.

Manage LangGraph workflows, debug state transitions, and handle checkpoints
75 charsno explicit “when” trigger

About this skill

LangGraph Workflow Management Skill

Purpose

Master the AARLP recruitment workflow powered by LangGraph. This skill covers workflow debugging, state management, checkpoint handling, and adding new nodes/edges to the graph.

AI Provider Selection (Amazon Nova AI Hackathon)

AARLP supports two AI providers:

  • bedrock (default for hackathon) - AWS Nova models
  • openai (fallback) - OpenAI GPT-4

Set via AI_PROVIDER environment variable.

Secrets Management

Required for External API Calls:

SecretProviderPurpose
AWS_ACCESS_KEY_IDbedrockNova model access
AWS_SECRET_ACCESS_KEYbedrockNova model access
OPENAI_API_KEYopenaiJD generation fallback
PINECONE_API_KEYbothShortlisting nodes

Configuration (app/core/config.py):

class Settings(BaseSettings):
    # AI Provider Selection
    ai_provider: Literal["openai", "bedrock"] = "bedrock"
    
    # AWS Bedrock (Primary for hackathon)
    aws_access_key_id: str
    aws_secret_access_key: str
    aws_region: str = "us-east-1"
    bedrock_model_id: str = "amazon.nova-lite-v1:0"
    
    # OpenAI (Fallback)
    openai_api_key: str
    openai_model: str = "gpt-4o"
    
    class Config:
        env_file = ".env"

Security in Workflow Nodes:

async def api_integration_node(state: GraphState) -> GraphState:
    """Use provider-agnostic client for AI calls."""
    from app.ai.client import is_bedrock_provider
    from app.core.config import get_settings
    
    if is_bedrock_provider():
        from app.ai.bedrock_client import invoke_nova_model
        result = await invoke_nova_model(messages=[...])
    else:
        from app.ai.client import get_openai_client
        client = get_openai_client()
        response = await client.chat.completions.create(...)
    
    # NEVER log API keys
    logger.info(f"AI call completed for job {state.job_id}")
    
    return state.model_copy(...)

Architecture Overview

Workflow Files Structure

app/workflow/
├── state.py          # Pydantic GraphState models
├── nodes.py          # Node function implementations
├── edges.py          # Conditional routing logic
├── builder.py        # Graph construction
├── engine.py         # High-level workflow API
├── checkpoints.py    # State persistence
├── constants.py      # NodeName enum, constants
├── helpers.py        # Shared utilities
└── exceptions.py     # Workflow errors

Core Concepts

1. State Management (Pydantic-Based)

The workflow uses immutable Pydantic models for type-safe state:

from app.workflow.state import GraphState

# State is always accessed through Pydantic models
state = GraphState(
    job_id="123",
    current_node="generate_jd",
    jd=JobDescriptionState(
        title="Senior Engineer",
        status=Status.PENDING,
        approval_status=ApprovalStatus.PENDING
    )
)

# Type-safe access with autocomplete
print(state.jd.approval_status)  # ✅ IDE knows this is ApprovalStatus

Key Benefits:

  • ✅ Runtime validation on every state update
  • ✅ No manual dict-to-object conversions
  • ✅ Full IDE support with autocomplete
  • ✅ Nested state objects for organization

2. Checkpointing (Pause/Resume)

AARLP uses checkpoints to persist workflow state:

from app.workflow.engine import WorkflowEngine
from app.workflow.checkpoints import save_checkpoint, load_checkpoint

# Initialize with checkpointing enabled
# IMPORTANT: Use consistent thread_id format: f"job-{job_id}"
engine = WorkflowEngine(
    job_id="job-123",
    thread_id="job-123",  # Standard format: job-{job_id}
    enable_checkpointing=True
)

# Run until human-in-the-loop wait state
result = await engine.run_until_interrupt()

# Later, resume from checkpoint
resumed_state = await engine.resume_from_checkpoint(
    updates={"jd": {"approval_status": "APPROVED"}}
)

Checkpoint Use Cases:

  • JD approval waiting
  • Shortlist approval waiting
  • Voice interview scheduling
  • Any human-in-the-loop decision point

3. Node Development

Node Function Signature

from app.workflow.state import GraphState

async def my_node(state: GraphState) -> GraphState:
    """
    All nodes must:
    1. Accept GraphState as input
    2. Return modified GraphState
    3. Be async (for DB/API calls)
    4. Handle errors with custom exceptions
    """
    
    # Extract what you need (type-safe)
    job_id = state.job_id
    
    # Do work
    result = await some_async_operation(job_id)
    
    # Return updated state (Pydantic handles validation)
    return state.model_copy(
        update={
            "current_node": "my_node",
            "updated_at": datetime.now(timezone.utc)
        }
    )

Adding a New Node

  1. Define the node function in nodes.py:
async def my_new_node(state: GraphState) -> GraphState:
    """Description of what this node does."""
    try:
        # Your logic here
        logger.info(f"Processing {state.job_id} in my_new_node")
        
        # Update state
        return state.model_copy(
            update={"current_node": NodeName.MY_NEW_NODE}
        )
    except Exception as e:
        logger.error(f"Error in my_new_node: {e}")
        raise GraphExecutionError(f"Failed at my_new_node: {e}")
  1. Add node name to constants in constants.py:
class NodeName(str, Enum):
    GENERATE_JD = "generate_jd"
    MY_NEW_NODE = "my_new_node"  # Add this
  1. Register in builder in builder.py:
from app.workflow.nodes import my_new_node

graph.add_node(NodeName.MY_NEW_NODE, my_new_node)
  1. Add edges (routing):
# Simple edge
graph.add_edge(NodeName.GENERATE_JD, NodeName.MY_NEW_NODE)

# Conditional edge
graph.add_conditional_edges(
    NodeName.MY_NEW_NODE,
    my_routing_function,  # Defined in edges.py
    {
        "success": NodeName.NEXT_NODE,
        "failure": NodeName.ERROR_NODE
    }
)

4. Conditional Routing (Edges)

Edges determine the next node based on state:

# In edges.py
def my_routing_function(state: GraphState) -> str:
    """
    Return a string key that maps to a node in the edge map.
    """
    if state.jd.approval_status == ApprovalStatus.APPROVED:
        return "approved"
    elif state.jd.approval_status == ApprovalStatus.REJECTED:
        return "rejected"
    else:
        return "waiting"

# In builder.py
graph.add_conditional_edges(
    NodeName.WAIT_JD_APPROVAL,
    my_routing_function,
    {
        "approved": NodeName.POST_JOB,
        "rejected": NodeName.GENERATE_JD,  # Regenerate
        "waiting": NodeName.WAIT_JD_APPROVAL  # Stay in wait state
    }
)

Common Workflow Patterns

Pattern 1: Human-in-the-Loop

async def wait_for_approval_node(state: GraphState) -> GraphState:
    """
    Node that interrupts execution until human approval.
    Uses special interrupt pattern.
    """
    from langgraph.types import interrupt
    
    # Send interrupt signal with current state
    interrupt(
        value={
            "message": "Waiting for JD approval",
            "job_id": state.job_id,
            "jd": state.jd.model_dump()
        }
    )
    
    # This line only executes after resume
    return state.model_copy(
        update={"current_node": NodeName.WAIT_JD_APPROVAL}
    )

Pattern 2: Error Handling

async def robust_node(state: GraphState) -> GraphState:
    """Node with comprehensive error handling."""
    try:
        result = await risky_operation()
        
        return state.model_copy(
            update={"current_node": NodeName.ROBUST_NODE}
        )
    
    except SpecificError as e:
        # Log and store error in state
        logger.error(f"Specific error: {e}")
        return state.model_copy(
            update={
                "error_message": str(e),
                "current_node": NodeName.ERROR_HANDLER
            }
        )
    
    except Exception as e:
        # Unexpected errors raise custom exception
        logger.critical(f"Unexpected error: {e}")
        raise GraphExecutionError(f"Failed: {e}")

Pattern 3: External API Integration

async def api_integration_node(state: GraphState) -> GraphState:
    """Call external APIs within workflow."""
    from app.ai.client import get_openai_client
    
    client = get_openai_client()
    
    # Make async API call
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "..."}]
    )
    
    result = response.choices[0].message.content
    
    # Store result in state
    return state.model_copy(
        update={
            "jd": state.jd.model_copy(update={"content": result})
        }
    )

Debugging Workflows

1. Enable Debug Logging

# In config
import logging
logging.getLogger("app.workflow").setLevel(logging.DEBUG)

2. Inspect State at Each Node

# Add to node for debugging
logger.debug(f"State at {NodeName.MY_NODE}: {state.model_dump_json(indent=2)}")

3. Test Individual Nodes

# In tests/workflow/test_nodes.py
import pytest
from app.workflow.nodes import my_node
from app.workflow.state import GraphState

@pytest.mark.asyncio
async def test_my_node():
    # Arrange
    initial_state = GraphState(
        job_id="test-123",
        current_node="start",
        jd=JobDescriptionState(status=Status.PENDING)
    )
    
    # Act
    result = await my_node(initial_state)
    
    # Assert
    assert result.current_node == NodeName.MY_NODE
    assert result.jd.status == Status.COMPLETED

4. Validate State Transitions

# Test edge logic
from app.workflow.edges import my_routing_function

def test_routing_approved():
    state = GraphState(
        job_id="123",
        jd=JobDescriptionState(approval_status=ApprovalStatus.APPROVED)
    )
    
    next_node = my_routing_function(state)
    assert 

---

*Content truncated.*

Search skills

Search the agent skills registry