agentskills.codes

Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js

Install

mkdir -p .claude/skills/api-dev && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/16038" && unzip -o skill.zip -d .claude/skills/api-dev && rm skill.zip

Installs to .claude/skills/api-dev

Activation

This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.

Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js, Go, and other languages.
326 charsno explicit “when” triggerlonger than Claude Code's old 250-char listing cap (fine on current versions)

About this skill

API Development Patterns & Best Practices

This skill provides comprehensive patterns for building modern APIs in 2025, focusing on async/await patterns, performance optimization, security, testing, and production-ready configurations that work across different frameworks and languages.

When to Use This Skill

Use this skill when you need to:

  • Design RESTful or GraphQL APIs
  • Implement async/await patterns for high performance
  • Add middleware for authentication, logging, and validation
  • Handle errors gracefully with proper HTTP status codes
  • Implement rate limiting and throttling
  • Generate OpenAPI/Swagger documentation
  • Set up comprehensive testing strategies
  • Optimize API performance with caching and connection pooling
  • Implement API versioning and backward compatibility
  • Set up monitoring and observability

Core API Design Principles

1. Async/Await Patterns for Performance

# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time

@dataclass
class RequestMetrics:
    """Request metrics for monitoring"""
    duration: float
    status_code: int
    endpoint: str
    method: str
    user_id: Optional[str] = None

def with_metrics(func):
    """Decorator to add request metrics"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        status_code = 200
        endpoint = func.__name__
        method = kwargs.get('method', 'GET')

        try:
            result = await func(*args, **kwargs)
            if hasattr(result, 'status_code'):
                status_code = result.status_code
            return result
        except Exception as e:
            status_code = getattr(e, 'status_code', 500)
            raise
        finally:
            duration = time.time() - start_time
            metrics = RequestMetrics(
                duration=duration,
                status_code=status_code,
                endpoint=endpoint,
                method=method
            )
            # Send metrics to monitoring system
            await send_metrics(metrics)

    return wrapper

async def send_metrics(metrics: RequestMetrics):
    """Send metrics to monitoring system"""
    # Implementation depends on your monitoring system
    # Example: Prometheus, Datadog, or custom analytics
    pass

class AsyncAPIClient:
    """Generic async API client with connection pooling"""

    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self._session: Optional[aiohttp.ClientSession] = None
        self._session_lock = asyncio.Lock()

    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create session with connection pooling"""
        if self._session is None or self._session.closed:
            async with self._session_lock:
                if self._session is None or self._session.closed:
                    connector = aiohttp.TCPConnector(
                        limit=100,  # Total connection pool size
                        limit_per_host=30,  # Connections per host
                        force_close=False,
                        enable_cleanup_closed=True
                    )
                    self._session = aiohttp.ClientSession(
                        connector=connector,
                        timeout=self.timeout
                    )
        return self._session

    @with_metrics
    async def get(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Make GET request with retry logic"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.get(
            url,
            params=params,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    @with_metrics
    async def post(
        self,
        endpoint: str,
        data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Make POST request with retry logic"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.post(
            url,
            data=data,
            json=json,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    async def close(self):
        """Close the session"""
        if self._session:
            await self._session.close()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

2. Circuit Breaker Pattern

# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for fault tolerance"""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: Exception = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")

            try:
                result = await func(*args, **kwargs)
                if self.state == CircuitState.HALF_OPEN:
                    self._reset()
                return result
            except self.expected_exception as e:
                self._record_failure()
                raise

        return wrapper

    def _should_attempt_reset(self) -> bool:
        return time.time() - self.last_failure_time >= self.timeout

    def _record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _reset(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

# Usage example
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
    """External API call with circuit breaker"""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

3. Rate Limiting with Redis

# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware

class RateLimiter:
    """Rate limiter using Redis sliding window"""

    def __init__(
        self,
        redis_url: str,
        requests_per_minute: int = 100,
        window_size: int = 60
    ):
        self.redis = None
        self.redis_url = redis_url
        self.requests_per_minute = requests_per_minute
        self.window_size = window_size

    async def initialize(self):
        """Initialize Redis connection"""
        self.redis = await aioredis.from_url(self.redis_url)

    async def is_allowed(
        self,
        key: str,
        limit: Optional[int] = None,
        window: Optional[int] = None
    ) -> bool:
        """Check if request is allowed"""
        if not self.redis:
            await self.initialize()

        limit = limit or self.requests_per_minute
        window = window or self.window_size
        current_time = time.time()

        # Remove old requests from the sliding window
        await self.redis.zremrangebyscore(
            key,
            0,
            current_time - window
        )

        # Count current requests in window
        current_requests = await self.redis.zcard(key)

        if current_requests >= limit:
            return False

        # Add current request to window
        await self.redis.zadd(key, {str(current_time): current_time})
        await self.redis.expire(key, window)

        return True

class RateLimitMiddleware(BaseHTTPMiddleware):
    """FastAPI middleware for rate limiting"""

    def __init__(
        self,
        app,
        redis_url: str,
        requests_per_minute: int = 100,
        identifier_func=None
    ):
        super().__init__(app)
        self.limiter = RateLimiter(redis_url, requests_per_minute)
        self.identifier_func = identifier_func or self._default_identifier

    def _default_identifier(self, request: Request) -> str:
        """Default identifier function"""
        # Use IP address as identifier
        return request.client.host

    async def dispatch(self, request: Request, call_next):
        identifier = self.identifier_func(request)
        key = f"rate_limit:{identifier}:{request.url.path}"

        if not await self.limiter.is_allowed(key):
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded"
            )

        return await call_next(request)

4. API Versioning Strategy

# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod



---

*Content truncated.*

Search skills

Search the agent skills registry