agentskills.codes
FA

fastapi-async-patterns

Use for deep FastAPI concurrency, event-loop safety, async I/O, and performance patterns after a service structure already exists. Not a general FastAPI bootstrap skill; pair with fastapi-expert or fastapi-templates when needed.

Install

mkdir -p .claude/skills/fastapi-async-patterns && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/15252" && unzip -o skill.zip -d .claude/skills/fastapi-async-patterns && rm skill.zip

Installs to .claude/skills/fastapi-async-patterns

Activation

This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.

Use for deep FastAPI concurrency, event-loop safety, async I/O, and performance patterns after a service structure already exists. Not a general FastAPI bootstrap skill; pair with fastapi-expert or fastapi-templates when needed.
228 chars✓ has a “when” trigger

About this skill

FastAPI Async Patterns

Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.

All connection strings and secrets in these examples must come from environment variables or a secret manager, never from source-controlled literals.

Basic Async Route Handlers

Understanding async vs sync endpoints in FastAPI.

from fastapi import FastAPI
import time
import asyncio

app = FastAPI()

# Sync endpoint (blocks the event loop)
@app.get('/sync')
def sync_endpoint():
    time.sleep(1)  # Blocks the entire server
    return {'message': 'Completed after 1 second'}

# Async endpoint (non-blocking)
@app.get('/async')
async def async_endpoint():
    await asyncio.sleep(1)  # Other requests can be handled
    return {'message': 'Completed after 1 second'}

# CPU-bound work (use sync)
@app.get('/cpu-intensive')
def cpu_intensive():
    result = sum(i * i for i in range(10000000))
    return {'result': result}

# I/O-bound work (use async)
@app.get('/io-intensive')
async def io_intensive():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/data')
        return response.json()

Async Database Operations

Async database patterns with popular ORMs and libraries.

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import os
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

# SQLAlchemy async setup
DATABASE_URL = os.environ["DATABASE_URL"]
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

@app.get('/users/{user_id}')
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail='User not found')
    return user

# Direct asyncpg (lower level, faster)
async def get_asyncpg_pool():
    pool = await asyncpg.create_pool(
        os.environ["DATABASE_DSN"],
        min_size=10,
        max_size=20
    )
    try:
        yield pool
    finally:
        await pool.close()

@app.get('/users-fast/{user_id}')
async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1', user_id
        )
        if not row:
            raise HTTPException(status_code=404, detail='User not found')
        return dict(row)

# MongoDB with Motor
mongo_client = AsyncIOMotorClient(os.environ["MONGODB_URL"])
db = mongo_client.mydatabase

@app.get('/documents/{doc_id}')
async def get_document(doc_id: str):
    document = await db.collection.find_one({'_id': doc_id})
    if not document:
        raise HTTPException(status_code=404, detail='Document not found')
    return document

@app.post('/documents')
async def create_document(data: dict):
    result = await db.collection.insert_one(data)
    return {'id': str(result.inserted_id)}

# Tortoise ORM async
register_tortoise(
    app,
    db_url=os.environ["TORTOISE_DATABASE_URL"],
    modules={'models': ['app.models']},
    generate_schemas=True,
    add_exception_handlers=True,
)

from tortoise.models import Model
from tortoise import fields

class UserModel(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=255)
    email = fields.CharField(max_length=255)

@app.get('/tortoise-users/{user_id}')
async def get_tortoise_user(user_id: int):
    user = await UserModel.get_or_none(id=user_id)
    if not user:
        raise HTTPException(status_code=404, detail='User not found')
    return user

Background Tasks

Fire-and-forget tasks without blocking the response.

from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime

app = FastAPI()

# Simple background task
async def send_email(email: str, message: str):
    await asyncio.sleep(2)  # Simulate email sending
    print(f'Email sent to {email}: {message}')

@app.post('/send-email')
async def send_email_endpoint(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(send_email, email, message)
    return {'status': 'Email will be sent in background'}

# Multiple background tasks
async def log_activity(user_id: int, action: str):
    await asyncio.sleep(0.5)
    print(f'[{datetime.now()}] User {user_id} performed: {action}')

async def update_analytics(action: str):
    await asyncio.sleep(1)
    print(f'Analytics updated for action: {action}')

@app.post('/users/{user_id}/action')
async def perform_action(
    user_id: int,
    action: str,
    background_tasks: BackgroundTasks
):
    # Add multiple tasks
    background_tasks.add_task(log_activity, user_id, action)
    background_tasks.add_task(update_analytics, action)
    return {'status': 'Action logged'}

# Background cleanup
async def cleanup_temp_files(file_path: str):
    await asyncio.sleep(60)  # Wait before cleanup
    import os
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f'Cleaned up: {file_path}')

@app.post('/upload')
async def upload_file(
    file: UploadFile,
    background_tasks: BackgroundTasks
):
    temp_path = f'/tmp/{file.filename}'
    with open(temp_path, 'wb') as f:
        content = await file.read()
        f.write(content)

    # Schedule cleanup
    background_tasks.add_task(cleanup_temp_files, temp_path)
    return {'filename': file.filename, 'path': temp_path}

WebSocket Handling

Real-time bidirectional communication patterns.

from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json

app = FastAPI()

# Simple WebSocket
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f'Echo: {data}')
    except WebSocketDisconnect:
        print('Client disconnected')

# WebSocket with authentication
async def get_current_user_ws(websocket: WebSocket):
    token = websocket.query_params.get('token')
    if not token or not verify_token(token):
        await websocket.close(code=1008)  # Policy violation
        raise HTTPException(status_code=401, detail='Unauthorized')
    return decode_token(token)

@app.websocket('/ws/authenticated')
async def authenticated_websocket(
    websocket: WebSocket,
    user = Depends(get_current_user_ws)
):
    await websocket.accept()
    try:
        await websocket.send_text(f'Welcome {user["name"]}')
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f'{user["name"]}: {data}')
    except WebSocketDisconnect:
        print(f'User {user["name"]} disconnected')

# Broadcasting to multiple connections
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket('/ws/chat/{client_id}')
async def chat_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    await manager.broadcast(f'Client {client_id} joined the chat')
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f'Client {client_id}: {data}')
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f'Client {client_id} left the chat')

# WebSocket with JSON messages
@app.websocket('/ws/json')
async def json_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            message_type = data.get('type')

            if message_type == 'ping':
                await websocket.send_json({'type': 'pong'})
            elif message_type == 'message':
                await websocket.send_json({
                    'type': 'response',
                    'data': f'Received: {data.get("content")}'
                })
    except WebSocketDisconnect:
        print('Client disconnected')

Server-Sent Events (SSE)

One-way streaming from server to client.

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.get('/sse')
async def sse_endpoint():
    async def event_generator():
        for i in range(10):
            await asyncio.sleep(1)
            yield {
                'event': 'message',
                'data': f'Message {i}'
            }

    return EventSourceResponse(event_generator())

# SSE with real-time updates
@app.get('/sse/updates')
async def sse_updates():
    async def update_generator():
        while True:
            # Simulate fetching updates
            await asyncio.sleep(2)
            update = await fetch_latest_update()
          

---

*Content truncated.*

Search skills

Search the agent skills registry