Python `ai` SDK — models, providers, streams, events, tools, agents, hooks, MCP, AI SDK UI, structured output, and media generation
Install
mkdir -p .claude/skills/ai-vercel-labs && curl -L -o skill.zip "https://agentskills.codes/api/skills/download/15391" && unzip -o skill.zip -d .claude/skills/ai-vercel-labs && rm skill.zipInstalls to .claude/skills/ai-vercel-labs
Activation
This is the description your AI agent reads to decide when to run this skill — the better it matches your request, the more reliably it fires.
Python `ai` SDK — models, providers, streams, events, tools, agents, hooks, MCP, AI SDK UI, structured output, and media generationAbout this skill
ai
Use this skill when working with the Python ai SDK.
uv add ai
Direct OpenAI-compatible and Anthropic-compatible providers require optional
extras: uv add "ai[openai]" or uv add "ai[anthropic]". AI Gateway works
with the base package.
import ai
Quick start
import asyncio
import ai
@ai.tool
async def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"Sunny, 72F in {city}"
async def main() -> None:
model = ai.get_model("gateway:anthropic/claude-sonnet-4")
agent = ai.agent(tools=[get_weather])
messages = [
ai.system_message("You are a helpful weather assistant."),
ai.user_message("What's the weather in Tokyo?"),
]
async with agent.run(model, messages) as stream:
async for event in stream:
if isinstance(event, ai.events.TextDelta):
print(event.chunk, end="", flush=True)
print(stream.output)
if __name__ == "__main__":
asyncio.run(main())
ai.stream(...) and agent.run(...) are async context managers. Iterate events
inside the context. After iteration, read final state from the stream object.
Models and providers
model = ai.get_model() # reads AI_SDK_DEFAULT_MODEL
model = ai.get_model("anthropic/claude-sonnet-4") # unprefixed: gateway route
model = ai.get_model("gateway:anthropic/claude-sonnet-4")
model = ai.get_model("openai:gpt-5.4") # direct provider route
model = ai.get_model("anthropic:claude-sonnet-4-6")
- Gateway credentials use
AI_GATEWAY_API_KEY. - Direct providers use provider-specific env vars such as
OPENAI_API_KEYandANTHROPIC_API_KEY. - Use
ai.get_provider(...)when you need a custom base URL, API key, headers, or client. - Use
await ai.probe(model)to check credentials and model availability.
provider = ai.get_provider(
"openai",
base_url="http://localhost:1234/v1",
api_key="your_access_token_here",
)
model = ai.Model("local-model", provider=provider)
models = await ai.get_provider("anthropic").list_models()
Request-scoped provider options go through params:
params = {
"providerOptions": {
"gateway": {"sort": "cost"},
"anthropic": {"speed": "fast"},
}
}
async with ai.stream(model, messages, params=params) as stream:
async for event in stream:
...
Messages and events
Messages are Pydantic models with typed parts. Use builders for common roles and parts:
ai.system_message("Be concise.")
ai.user_message("Describe this image:", ai.file_part(image_bytes, media_type="image/png"))
ai.assistant_message(ai.thinking("scratchpad"), "Final answer")
ai.tool_result_part("tc-1", result={"temp": 72}, tool_name="get_weather")
ai.tool_message(tool_call_id="tc-1", result=72, tool_name="get_weather")
Common message properties:
message.text,message.reasoning.message.tool_calls,message.tool_results.message.builtin_tool_calls,message.builtin_tool_returns.message.files,message.images,message.videos.message.get_output()ormessage.get_output(MyModel).
Streams and agents yield event objects from ai.events:
async with ai.stream(model, messages, tools=tools) as stream:
async for event in stream:
if isinstance(event, ai.events.TextDelta):
print(event.chunk, end="", flush=True)
elif isinstance(event, ai.events.ToolEnd):
print(event.tool_call.tool_name, event.tool_call.tool_args)
elif isinstance(event, ai.events.ToolCallResult):
for result in event.results:
print(result.tool_name, result.result)
elif isinstance(event, ai.events.HookEvent):
print(event.hook.hook_id, event.hook.status)
elif isinstance(event, ai.events.PartialToolCallResult):
print(event.label, event.value)
After iteration:
stream.message # final assistant message for ai.stream
stream.messages # updated agent history for agent.run
stream.text # text output for ai.stream
stream.output # text or parsed Pydantic output
stream.tool_calls # function tool calls from ai.stream
stream.usage # latest reported usage
Serialize and restore history with Pydantic JSON:
encoded = [message.model_dump(mode="json") for message in stream.messages]
restored = [ai.messages.Message.model_validate(item) for item in encoded]
Direct streaming
Use ai.stream when you want one model response and will handle any function
tool calls yourself:
async with ai.stream(model, messages, tools=[get_weather.tool]) as stream:
async for event in stream:
if isinstance(event, ai.events.TextDelta):
print(event.chunk, end="", flush=True)
for call in stream.tool_calls:
print(call.tool_name, call.tool_args)
Use structured output with a Pydantic model:
import pydantic
class Forecast(pydantic.BaseModel):
city: str
temperature: float
async with ai.stream(model, messages, output_type=Forecast) as stream:
async for event in stream:
...
forecast = stream.output
Tools
A function tool is an async Python function decorated with @ai.tool. The
function name becomes the tool name, the docstring becomes the description, and
the signature becomes a Pydantic-validated JSON schema.
@ai.tool
async def scan_sector(sector: str, depth: int = 1) -> str:
"""Scan a sector at the requested depth."""
return f"{sector}: clear at depth {depth}"
Use schema-only tools with ai.stream when the SDK should not execute them:
tool = ai.Tool(
kind="function",
name="get_weather",
args=ai.tools.FunctionToolArgs(
description="Get current weather for a city.",
params={
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
),
)
Provider-executed tools run outside your process:
tools = [ai.providers.anthropic.tools.web_search(max_uses=3)]
async with ai.stream(model, messages, tools=tools) as stream:
async for event in stream:
if isinstance(event, ai.events.BuiltinToolResult):
print(event.result.tool_name, event.result.result)
Tool validation failures and exceptions become ToolCallResult events with
error result parts. The original exception is on event.exception for logging.
if isinstance(event, ai.events.ToolCallResult) and event.exception:
log_exception(event.exception)
Streaming tools
Async-generator tools yield partial values while they run. An aggregator turns those values into the final tool result the model sees.
@ai.tool
async def draft_reply(topic: str) -> ai.StreamingTextTool:
"""Draft a reply."""
yield "Checking "
yield f"records for {topic}."
@ai.tool
async def fetch(url: str) -> ai.StreamingStatusTool[str]:
"""Fetch a URL with status updates."""
yield "connecting"
yield "downloading"
yield body # last yield is the tool result
@ai.tool
async def research(topic: str) -> ai.SubAgentTool:
"""Research a topic with a subagent."""
subagent = ai.agent(tools=[...])
async with subagent.run(model, [ai.user_message(topic)]) as stream:
async for event in stream:
yield event
For custom aggregation, annotate an async-generator return type with
Annotated[AsyncGenerator[T], ai.agents.Aggregate(...)]. Built-in
aggregators: ai.agents.ConcatAggregator, ai.agents.LastAggregator, and
ai.agents.MessageAggregator.
Agents
Use an agent when the SDK should execute Python tools, append tool results, and continue until the assistant returns a final answer.
agent = ai.agent(tools=[get_weather])
async with agent.run(model, messages) as stream:
async for event in stream:
if isinstance(event, ai.events.TextDelta):
print(event.chunk, end="", flush=True)
history = stream.messages
answer = stream.output
Pass structured output and provider params through agent.run:
async with agent.run(
model,
[ai.user_message("Return a JSON forecast.")],
output_type=Forecast,
params={"temperature": 0},
) as stream:
async for event in stream:
...
forecast = stream.output
Custom agent loops
Subclass ai.Agent and override loop for custom scheduling, routing,
logging, persistence, or approval logic.
from collections.abc import AsyncGenerator
class CustomAgent(ai.Agent):
async def loop(self, context: ai.Context) -> AsyncGenerator[ai.events.AgentEvent]:
while context.keep_running():
async with (
ai.stream(context=context) as stream,
ai.ToolRunner() as tool_runner,
):
async for event in ai.util.merge(stream, tool_runner.events()):
yield event
if isinstance(event, ai.events.ToolEnd):
tool_call = context.resolve(event.tool_call)
tool_runner.schedule(tool_call)
context.add(stream.message)
context.add(tool_runner.get_tool_message())
Loop helpers: context.model, context.messages, context.tools,
context.output_type, context.params, context.resolve(...),
context.keep_running(), and context.add(...).
Multi-agent
Use ai.SubAgentTool for agent-as-tool workflows. Use ai.yield_from(...)
inside custom loops to fan out streams and forward nested events as
PartialToolCallResult values with labels.
async with (
researcher.run(model, research_messages) as research_stream,
analyst.run(model, analyst_messages) as analyst_stream,
):
research_text, analyst_text = await asyncio.gather(
ai.yield_from(
research_stream,
label="researcher"
---
*Content truncated.*