api-dev
Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js
Install
mkdir -p .claude/skills/api-dev && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/16038" && unzip -o skill.zip -d .claude/skills/api-dev && rm skill.zipInstalls to .claude/skills/api-dev
Activation
This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.
Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js, Go, and other languages.About this skill
API Development Patterns & Best Practices
This skill provides comprehensive patterns for building modern APIs in 2025, focusing on async/await patterns, performance optimization, security, testing, and production-ready configurations that work across different frameworks and languages.
When to Use This Skill
Use this skill when you need to:
- Design RESTful or GraphQL APIs
- Implement async/await patterns for high performance
- Add middleware for authentication, logging, and validation
- Handle errors gracefully with proper HTTP status codes
- Implement rate limiting and throttling
- Generate OpenAPI/Swagger documentation
- Set up comprehensive testing strategies
- Optimize API performance with caching and connection pooling
- Implement API versioning and backward compatibility
- Set up monitoring and observability
Core API Design Principles
1. Async/Await Patterns for Performance
# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time
@dataclass
class RequestMetrics:
"""Request metrics for monitoring"""
duration: float
status_code: int
endpoint: str
method: str
user_id: Optional[str] = None
def with_metrics(func):
"""Decorator to add request metrics"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status_code = 200
endpoint = func.__name__
method = kwargs.get('method', 'GET')
try:
result = await func(*args, **kwargs)
if hasattr(result, 'status_code'):
status_code = result.status_code
return result
except Exception as e:
status_code = getattr(e, 'status_code', 500)
raise
finally:
duration = time.time() - start_time
metrics = RequestMetrics(
duration=duration,
status_code=status_code,
endpoint=endpoint,
method=method
)
# Send metrics to monitoring system
await send_metrics(metrics)
return wrapper
async def send_metrics(metrics: RequestMetrics):
"""Send metrics to monitoring system"""
# Implementation depends on your monitoring system
# Example: Prometheus, Datadog, or custom analytics
pass
class AsyncAPIClient:
"""Generic async API client with connection pooling"""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
self._session_lock = asyncio.Lock()
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session with connection pooling"""
if self._session is None or self._session.closed:
async with self._session_lock:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
force_close=False,
enable_cleanup_closed=True
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self._session
@with_metrics
async def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make GET request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.get(
url,
params=params,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
@with_metrics
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make POST request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.post(
url,
data=data,
json=json,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
async def close(self):
"""Close the session"""
if self._session:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
2. Circuit Breaker Pattern
# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for fault tolerance"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def __call__(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self._reset()
return result
except self.expected_exception as e:
self._record_failure()
raise
return wrapper
def _should_attempt_reset(self) -> bool:
return time.time() - self.last_failure_time >= self.timeout
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# Usage example
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
"""External API call with circuit breaker"""
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
3. Rate Limiting with Redis
# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimiter:
"""Rate limiter using Redis sliding window"""
def __init__(
self,
redis_url: str,
requests_per_minute: int = 100,
window_size: int = 60
):
self.redis = None
self.redis_url = redis_url
self.requests_per_minute = requests_per_minute
self.window_size = window_size
async def initialize(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(self.redis_url)
async def is_allowed(
self,
key: str,
limit: Optional[int] = None,
window: Optional[int] = None
) -> bool:
"""Check if request is allowed"""
if not self.redis:
await self.initialize()
limit = limit or self.requests_per_minute
window = window or self.window_size
current_time = time.time()
# Remove old requests from the sliding window
await self.redis.zremrangebyscore(
key,
0,
current_time - window
)
# Count current requests in window
current_requests = await self.redis.zcard(key)
if current_requests >= limit:
return False
# Add current request to window
await self.redis.zadd(key, {str(current_time): current_time})
await self.redis.expire(key, window)
return True
class RateLimitMiddleware(BaseHTTPMiddleware):
"""FastAPI middleware for rate limiting"""
def __init__(
self,
app,
redis_url: str,
requests_per_minute: int = 100,
identifier_func=None
):
super().__init__(app)
self.limiter = RateLimiter(redis_url, requests_per_minute)
self.identifier_func = identifier_func or self._default_identifier
def _default_identifier(self, request: Request) -> str:
"""Default identifier function"""
# Use IP address as identifier
return request.client.host
async def dispatch(self, request: Request, call_next):
identifier = self.identifier_func(request)
key = f"rate_limit:{identifier}:{request.url.path}"
if not await self.limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await call_next(request)
4. API Versioning Strategy
# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod
---
*Content truncated.*