fastapi-async-patterns
Use for deep FastAPI concurrency, event-loop safety, async I/O, and performance patterns after a service structure already exists. Not a general FastAPI bootstrap skill; pair with fastapi-expert or fastapi-templates when needed.
Install
mkdir -p .claude/skills/fastapi-async-patterns && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/15252" && unzip -o skill.zip -d .claude/skills/fastapi-async-patterns && rm skill.zipInstalls to .claude/skills/fastapi-async-patterns
Activation
This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.
Use for deep FastAPI concurrency, event-loop safety, async I/O, and performance patterns after a service structure already exists. Not a general FastAPI bootstrap skill; pair with fastapi-expert or fastapi-templates when needed.About this skill
FastAPI Async Patterns
Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.
All connection strings and secrets in these examples must come from environment variables or a secret manager, never from source-controlled literals.
Basic Async Route Handlers
Understanding async vs sync endpoints in FastAPI.
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
# Sync endpoint (blocks the event loop)
@app.get('/sync')
def sync_endpoint():
time.sleep(1) # Blocks the entire server
return {'message': 'Completed after 1 second'}
# Async endpoint (non-blocking)
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(1) # Other requests can be handled
return {'message': 'Completed after 1 second'}
# CPU-bound work (use sync)
@app.get('/cpu-intensive')
def cpu_intensive():
result = sum(i * i for i in range(10000000))
return {'result': result}
# I/O-bound work (use async)
@app.get('/io-intensive')
async def io_intensive():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
Async Database Operations
Async database patterns with popular ORMs and libraries.
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import os
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# SQLAlchemy async setup
DATABASE_URL = os.environ["DATABASE_URL"]
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get('/users/{user_id}')
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
# Direct asyncpg (lower level, faster)
async def get_asyncpg_pool():
pool = await asyncpg.create_pool(
os.environ["DATABASE_DSN"],
min_size=10,
max_size=20
)
try:
yield pool
finally:
await pool.close()
@app.get('/users-fast/{user_id}')
async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
if not row:
raise HTTPException(status_code=404, detail='User not found')
return dict(row)
# MongoDB with Motor
mongo_client = AsyncIOMotorClient(os.environ["MONGODB_URL"])
db = mongo_client.mydatabase
@app.get('/documents/{doc_id}')
async def get_document(doc_id: str):
document = await db.collection.find_one({'_id': doc_id})
if not document:
raise HTTPException(status_code=404, detail='Document not found')
return document
@app.post('/documents')
async def create_document(data: dict):
result = await db.collection.insert_one(data)
return {'id': str(result.inserted_id)}
# Tortoise ORM async
register_tortoise(
app,
db_url=os.environ["TORTOISE_DATABASE_URL"],
modules={'models': ['app.models']},
generate_schemas=True,
add_exception_handlers=True,
)
from tortoise.models import Model
from tortoise import fields
class UserModel(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255)
email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}')
async def get_tortoise_user(user_id: int):
user = await UserModel.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
Background Tasks
Fire-and-forget tasks without blocking the response.
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime
app = FastAPI()
# Simple background task
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f'Email sent to {email}: {message}')
@app.post('/send-email')
async def send_email_endpoint(
email: str,
message: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, message)
return {'status': 'Email will be sent in background'}
# Multiple background tasks
async def log_activity(user_id: int, action: str):
await asyncio.sleep(0.5)
print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str):
await asyncio.sleep(1)
print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action')
async def perform_action(
user_id: int,
action: str,
background_tasks: BackgroundTasks
):
# Add multiple tasks
background_tasks.add_task(log_activity, user_id, action)
background_tasks.add_task(update_analytics, action)
return {'status': 'Action logged'}
# Background cleanup
async def cleanup_temp_files(file_path: str):
await asyncio.sleep(60) # Wait before cleanup
import os
if os.path.exists(file_path):
os.remove(file_path)
print(f'Cleaned up: {file_path}')
@app.post('/upload')
async def upload_file(
file: UploadFile,
background_tasks: BackgroundTasks
):
temp_path = f'/tmp/{file.filename}'
with open(temp_path, 'wb') as f:
content = await file.read()
f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}
WebSocket Handling
Real-time bidirectional communication patterns.
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json
app = FastAPI()
# Simple WebSocket
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'Echo: {data}')
except WebSocketDisconnect:
print('Client disconnected')
# WebSocket with authentication
async def get_current_user_ws(websocket: WebSocket):
token = websocket.query_params.get('token')
if not token or not verify_token(token):
await websocket.close(code=1008) # Policy violation
raise HTTPException(status_code=401, detail='Unauthorized')
return decode_token(token)
@app.websocket('/ws/authenticated')
async def authenticated_websocket(
websocket: WebSocket,
user = Depends(get_current_user_ws)
):
await websocket.accept()
try:
await websocket.send_text(f'Welcome {user["name"]}')
while True:
data = await websocket.receive_text()
await websocket.send_text(f'{user["name"]}: {data}')
except WebSocketDisconnect:
print(f'User {user["name"]} disconnected')
# Broadcasting to multiple connections
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}')
async def chat_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast(f'Client {client_id} joined the chat')
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f'Client {client_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f'Client {client_id} left the chat')
# WebSocket with JSON messages
@app.websocket('/ws/json')
async def json_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message_type = data.get('type')
if message_type == 'ping':
await websocket.send_json({'type': 'pong'})
elif message_type == 'message':
await websocket.send_json({
'type': 'response',
'data': f'Received: {data.get("content")}'
})
except WebSocketDisconnect:
print('Client disconnected')
Server-Sent Events (SSE)
One-way streaming from server to client.
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get('/sse')
async def sse_endpoint():
async def event_generator():
for i in range(10):
await asyncio.sleep(1)
yield {
'event': 'message',
'data': f'Message {i}'
}
return EventSourceResponse(event_generator())
# SSE with real-time updates
@app.get('/sse/updates')
async def sse_updates():
async def update_generator():
while True:
# Simulate fetching updates
await asyncio.sleep(2)
update = await fetch_latest_update()
---
*Content truncated.*