跳到主要内容

6 篇博文 含有标签「AI-Agent」

查看所有标签

用 Tavily MCP 替代智谱搜索,节省 70% 编码套餐额度

· 阅读需 4 分钟

在使用 智谱 GLM 编码套餐 进行日常开发时,发现一个容易被忽视的额度消耗问题:智谱内置的 MCP 联网搜索和网页读取工具,每次调用都消耗编码套餐额度。本文记录排查过程和用 Tavily MCP 替代的方案。

TL;DR

智谱 GLM 编码套餐内置 web-search-primeweb-reader 两个 MCP 服务,每次搜索/读取消耗对话额度。替换为 Tavily MCP 后,联网搜索使用独立免费额度(1000 次/月),不再挤占编码对话用量。

问题现象

在一次 WooCommerce 主题上架开发中,频繁使用联网搜索查文档、读网页。开发完成后查看额度:

智谱编码套餐额度使用情况

5 小时周期内已用 16%,主要消耗来自 MCP 工具调用而非编码对话本身。

查看 MCP 调用统计更直观:

MCP 工具调用量统计

网页搜索和网页读取占了 MCP 调用的大部分。这些调用完全可以不消耗编码套餐额度。

根因

智谱 GLM 编码套餐通过 open.bigmodel.cn 的 MCP 端点提供联网能力:

web-search-prime: https://open.bigmodel.cn/api/mcp/web_search_prime/mcp
web-reader: https://open.bigmodel.cn/api/mcp/web_reader/mcp
zread: https://open.bigmodel.cn/api/mcp/zread/mcp

这些是 HTTP 类型的 MCP 服务,每次调用经过智谱 API 网关,按编码套餐额度计费。而联网搜索本质上是通用能力,不应该消耗 AI 编码额度。

解决方案:用 Tavily MCP 替代

Tavily 提供独立的免费额度(1000 次/月),搜索和网页提取能力完整覆盖智谱的两个工具。

Step 1:移除智谱搜索/读取服务

claude mcp remove web-reader -s user
claude mcp remove web-search-prime -s user

Step 2:添加 Tavily MCP

claude mcp add tavily-search -s user -- npx -y tavily-mcp@latest

Step 3:配置 API Key

~/.claude.json 中补充环境变量:

{
"mcpServers": {
"tavily-search": {
"type": "stdio",
"command": "npx",
"args": ["-y", "tavily-mcp@latest"],
"env": {
"TAVILY_API_KEY": "tvly-your-key-here"
}
}
}
}

Tavily API Key 在 tavily.com 注册即可获取,免费额度 1000 次/月。

Step 4:清理残留配置

移除服务后,settings.json 中的权限引用也要清理:

// 删除 deny 中的智谱搜索引用
"deny": ["mcp__web-search-prime__web_search_prime"] // 删掉

// 删除 allow 中的 web-reader 引用
"mcp__web-reader__webReader" // 删掉

替换前后对比

维度智谱 web-search + web-readerTavily
计费方式消耗编码套餐额度独立免费额度 1000 次/月
搜索质量中文优化中英文均衡
网页提取独立工具内置 extract 功能
配置方式平台内置,无法关闭claude mcp add 一行命令
超出后挤占编码对话额度付费继续使用

注意事项

注意事项

  1. 智谱服务是平台内置的,即使移除本地配置,重启后可能自动恢复。需要在 settings.jsondeny 列表中明确屏蔽
  2. 保留 zread(GitHub 仓库阅读),这个工具消耗较少且功能独特
  3. 保留 doubao-vision(图像分析),Tavily 不覆盖这个场景

推荐

也在用智谱 GLM 编码套餐?

了解智谱 GLM 编码套餐


用 Python FastMCP 搭建自定义 MCP 工具库,按需接入任意 AI 模型

· 阅读需 8 分钟

在为客户构建 AI Agent 系统时,我们发现不同任务对模型能力和成本的需求差异很大:图像分析用视觉模型、文本补全用轻量模型、内部数据查询用本地模型。MCP(Model Context Protocol)让每个能力变成独立的工具,AI 客户端按需调用。

TL;DR

用 Python FastMCP 30 分钟搭建自定义 MCP Server,按场景和成本接入任意 OpenAI 兼容 API。本文以豆包视觉模型为例演示完整流程,并提供文本生成、图像生成、语音合成等场景的扩展模板。

MCP 解决什么问题

MCP(Model Context Protocol)是 Anthropic 提出的开放协议,标准化了 AI 应用与外部工具的通信方式。类比 USB-C:不管什么设备,插上就能用。

传统方式下,每接入一个 AI 能力就要写一套集成代码。MCP 把这个过程标准化:

AI 客户端 (Claude Code / Cursor / Cherry Studio)
↓ MCP 协议 (stdio / SSE)
MCP Server (你的工具库)
↓ HTTP API
各类 AI 模型 / 内部 API

一个 MCP Server 可以暴露多个工具,每个工具背后可以是不同的模型或 API。

为什么选 Python 而非 Node.js

MCP 官方同时提供 Python 和 TypeScript SDK。两者功能等价,选择依据:

维度Python (FastMCP)Node.js (@modelcontextprotocol/sdk)
AI 生态httpx/OpenAI SDK 原生支持需要额外依赖
代码量装饰器一行定义工具需手动注册 schema
二进制处理base64/PIL 原生简洁Buffer API 稍繁琐
数据科学pandas/numpy 随手可用需额外工具链
部署环境venv 隔离,无 Node 版本问题Node 版本兼容性常见坑

核心原因:MCP Server 的本质是「调 API + 处理数据」。Python 在 HTTP 调用、图像/文件处理、数据转换上的代码更简洁直观,依赖更少。Node.js 更适合已有 TypeScript 项目的团队。

30 分钟搭建:图像分析工具

环境准备

mkdir -p ~/.claude/mcp-servers/doubao-vision
cd ~/.claude/mcp-servers/doubao-vision
python3 -m venv venv
source venv/bin/activate
pip install mcp httpx

Server 代码

"""Doubao Vision MCP Server - OpenAI-compatible image analysis via Volcengine Ark."""

import base64
import os
from pathlib import Path

import httpx
from mcp.server.fastmcp import FastMCP

# Config - 通过环境变量注入,不硬编码
BASE_URL = os.getenv("DOUBAO_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3")
API_KEY = os.getenv("DOUBAO_API_KEY", "")
MODEL = os.getenv("DOUBAO_MODEL", "doubao-1-5-vision-pro-32k-250115")

mcp = FastMCP("doubao-vision")


def _build_image_content(image_source: str) -> dict:
"""Build image content part from URL or local file path."""
if image_source.startswith(("http://", "https://")):
return {"type": "image_url", "image_url": {"url": image_source}}
# Local file: encode to base64 data URI
path = Path(image_source)
if not path.exists():
raise FileNotFoundError(f"Image not found: {image_source}")
mime_map = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".webp": "image/webp",
}
mime = mime_map.get(path.suffix.lower(), "image/png")
data = base64.b64encode(path.read_bytes()).decode()
return {
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{data}"},
}


@mcp.tool()
def analyze_image(image_source: str, prompt: str) -> str:
"""Analyze an image using Doubao Vision Pro model.

Supports remote URLs and local file paths.

Args:
image_source: URL or local file path to the image.
prompt: What to analyze or describe about the image.
"""
try:
image_content = _build_image_content(image_source)
except FileNotFoundError as e:
return f"Error: {e}"

messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
image_content,
],
}
]

resp = httpx.post(
f"{BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
json={
"model": MODEL,
"messages": messages,
"max_tokens": 4096,
},
timeout=60,
)

if resp.status_code != 200:
return f"API error {resp.status_code}: {resp.text}"

data = resp.json()
return data["choices"][0]["message"]["content"]


if __name__ == "__main__":
mcp.run(transport="stdio")

核心设计:

  • _build_image_content:统一处理远程 URL 和本地文件(自动 base64 编码)
  • @mcp.tool() 装饰器:函数签名和 docstring 自动生成 MCP 工具描述,AI 客户端自动发现参数
  • 环境变量配置:API Key 不硬编码,通过 .mcp.jsonenv 注入

注册到客户端

~/.claude/.mcp.json(全局)或项目 .mcp.json 中添加:

{
"mcpServers": {
"doubao-vision": {
"command": "/path/to/venv/bin/python",
"args": ["/path/to/server.py"],
"env": {
"DOUBAO_API_KEY": "your-api-key",
"DOUBAO_MODEL": "doubao-1-5-vision-pro-32k-250115",
"DOUBAO_BASE_URL": "https://ark.cn-beijing.volces.com/api/v3"
}
}
}
}

按需扩展:多场景工具库

一个 MCP Server 可以注册多个工具,每个工具用不同模型。以下是各场景模板:

文本生成 / 补全

@mcp.tool()
def generate_text(prompt: str, model: str = "deepseek-chat") -> str:
"""Generate text using specified model. Supports deepseek-chat, qwen-turbo, etc."""
# 复用同一个 OpenAI 兼容接口,切换 base_url 和 model
providers = {
"deepseek-chat": {"base": "https://api.deepseek.com/v1", "key": os.getenv("DEEPSEEK_API_KEY")},
"qwen-turbo": {"base": "https://dashscope.aliyuncs.com/compatible-mode/v1", "key": os.getenv("QWEN_API_KEY")},
}
cfg = providers.get(model)
if not cfg:
return f"Unknown model: {model}"
# 调用 OpenAI 兼容接口...

图像生成

@mcp.tool()
def generate_image(prompt: str, size: str = "1024x1024") -> str:
"""Generate image from text prompt using Stable Diffusion WebUI API."""
resp = httpx.post(
"http://localhost:7860/sdapi/v1/txt2img",
json={"prompt": prompt, "width": 1024, "height": 1024},
timeout=120,
)
data = resp.json()
# 保存 base64 图片到文件并返回路径
img_bytes = base64.b64decode(data["images"][0])
path = f"/tmp/mcp-gen-{hash(prompt)}.png"
Path(path).write_bytes(img_bytes)
return f"Image saved to: {path}"

语音合成 (TTS)

@mcp.tool()
def text_to_speech(text: str, voice: str = "default") -> str:
"""Convert text to speech audio file."""
resp = httpx.post(
"https://openspeech.bytedance.com/api/v1/tts",
headers={"Authorization": f"Bearer;{os.getenv('TTS_API_KEY')}"},
json={"text": text, "voice": voice, "format": "mp3"},
timeout=30,
)
path = f"/tmp/mcp-tts-{hash(text)}.mp3"
Path(path).write_bytes(resp.content)
return f"Audio saved to: {path}"

内部 API 包装

@mcp.tool()
def query_database(sql: str) -> str:
"""Execute read-only SQL query on internal database. SELECT only."""
if not sql.strip().upper().startswith("SELECT"):
return "Error: Only SELECT queries are allowed."
resp = httpx.post(
"http://internal-api.company.com/query",
json={"sql": sql},
headers={"Authorization": f"Bearer {os.getenv('INTERNAL_API_KEY')}"},
timeout=10,
)
return resp.json()["result"]

成本优化:多模型路由

不同任务用不同成本的模型,是自建工具库的核心价值:

任务类型推荐模型成本级别
简单分类/提取Qwen-Turbo / Ollama 本地极低
文案/翻译DeepSeek-Chat
图像分析豆包 Vision Pro
复杂推理Claude / GPT-4

在 MCP Server 中实现路由:

@mcp.tool()
def smart_query(task: str, content: str) -> str:
"""Route task to optimal model based on complexity."""
route = {
"classify": ("qwen-turbo", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
"analyze_image": ("doubao-1-5-vision-pro-32k-250115", "https://ark.cn-beijing.volces.com/api/v3"),
"deep_reason": ("deepseek-reasoner", "https://api.deepseek.com/v1"),
}
model, base = route.get(task, route["classify"])
# 统一调用 OpenAI 兼容接口...

注意事项

  1. API Key 安全:通过环境变量注入,禁止硬编码在代码中,更不要提交到 git
  2. 超时控制:图像生成等慢任务设 120s+,简单文本任务 10-30s
  3. 错误处理:返回清晰的错误信息,AI 客户端会将其展示给用户
  4. 沙箱限制:包装内部 API 时,限制操作范围(如 SQL 只允许 SELECT)
  5. 并发考虑:httpx 同步调用即可,MCP 协议本身是串行的

扩展性:不只是工具,是 AI 基础设施

上述示例只覆盖了单个 MCP Server 的场景。实际生产中,扩展性远不止于此:

多 Server 组合

不同能力拆成独立 Server,按需加载:

{
"mcpServers": {
"doubao-vision": { "command": "python", "args": ["vision.py"] },
"deepseek-text": { "command": "python", "args": ["text.py"] },
"internal-api": { "command": "python", "args": ["api.py"] }
}
}

好处:一个 Server 挂了不影响其他;不同团队成员维护不同 Server。

共享配置抽取

多个 Server 复用同一套 provider 配置,用 Python module 共享:

# providers.py - 统一管理所有 API 配置
PROVIDERS = {
"doubao": {
"base_url": "https://ark.cn-beijing.volces.com/api/v3",
"api_key_env": "DOUBAO_API_KEY",
},
"deepseek": {
"base_url": "https://api.deepseek.com/v1",
"api_key_env": "DEEPSEEK_API_KEY",
},
}

支持 MCP Resources 和 Prompts

MCP 不只有 Tools,还有 Resources(静态数据)和 Prompts(预设提示词):

# Resources: 让 AI 读取你的内部文档
@mcp.resource("docs://api-spec")
def get_api_spec() -> str:
return Path("openapi.yaml").read_text()

# Prompts: 预设常用分析模板
@mcp.prompt()
def code_review_prompt(code: str) -> str:
return f"Review this code for security issues and performance:\n\n{code}"

从本地到远程部署

stdio 传输适合本地开发。生产环境可切换到 SSE 传输,部署为 HTTP 服务:

# 本地开发
mcp.run(transport="stdio")

# 生产部署(远程 AI 客户端通过 HTTP 访问)
mcp.run(transport="sse", host="0.0.0.0", port=8080)

进阶方向

方向说明
缓存层相同请求缓存结果,降低 API 调用成本
限流/配额按用户或工具限制调用频率
审计日志记录每次工具调用的输入输出,用于调试和合规
流式响应对长文本生成使用 SSE streaming,减少用户等待
工具组合一个工具的输出作为另一个的输入,构建 pipeline

对类似需求感兴趣?联系合作

用抽象类统一多搜索 API,错误返回而非抛异常

· 阅读需 5 分钟

在为客户构建 AI Agent 平台时遇到此问题:需要支持多个搜索提供商(Tavily、Serper、Brave、Bing),同时确保工具调用失败时不会中断 Agent 对话流程。

TL;DR

  1. 定义 SearchProvider 抽象基类 + SearchResult 数据模型,统一接口和输出格式
  2. 每个提供商继承基类,实现 search() 方法,内部做响应字段映射
  3. 关键设计:错误时返回包含错误信息的 SearchResult 对象,而非抛异常

问题现象

直接调用不同搜索 API 的问题:

# Tavily: POST 请求,results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST 请求,organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET 请求,web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET 请求,webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

问题

  1. 请求方式、认证头、响应结构各不相同
  2. 切换提供商需要改调用方代码
  3. raise Exception 会中断 AI Agent 的流式对话

根因

  1. 缺少抽象层:调用方直接依赖具体实现,违反依赖倒置原则
  2. 错误处理策略不统一:异常会沿调用栈向上传播,在流式场景下导致整个对话中断

对于 AI Agent 工具调用场景,Agent 需要根据错误信息决定是否重试、换用其他工具、或向用户说明情况——而不是直接崩溃。

解决方案

1. 定义抽象基类和数据模型

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. 实现具体提供商

Tavily(AI 优化搜索,支持 rate limit / quota 错误码):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# 错误时返回 SearchResult,而非 raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# 字段映射:Tavily 的 url -> 统一的 link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper(Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# 字段映射:Serper 的 organic[].link -> 统一的 link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

BraveBing 实现类似,区别在于请求方式和响应字段映射。

3. 调用方使用

# 使用时只需依赖抽象
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# 检查是否有错误(通过 title 或 snippet 判断)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent 可以根据错误信息决定下一步操作
return f"Search failed: {error_msg}"

return results


# 切换提供商只需换实例
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

关键设计决策

决策原因
错误返回 SearchResult 而非 raiseAI Agent 对话是流式流程,异常会中断整个对话
用 Pydantic BaseModel 定义输出自动校验 + IDE 提示 + JSON 序列化
抽象类用 ABC 而非 Protocol需要共享 __init__ 逻辑(api_key 存储)
超时统一 15 秒搜索是用户体验关键路径,不能太慢

对类似需求感兴趣?联系合作

绕过 Supabase Auth 实现 Playwright E2E 测试免登录

· 阅读需 4 分钟

在为客户构建 AI Agent SaaS 平台时遇到此问题,记录根因与解法。

TL;DR

E2E 测试不应该依赖真实的 OAuth 登录流程。通过在 useAuth hook 中检测 localStorage 的测试标记,直接注入 mock 认证状态,跳过 Supabase 初始化。同时将 Zustand store 的 loading 默认值改为 false,避免 AuthGuard 卡在无限 spinner。

问题现象

使用 Playwright 测试 React SPA 时,页面被 AuthGuard 组件保护,需要 Supabase 认证才能访问。测试启动后,页面一直显示 loading spinner,无法进入业务流程。

// AuthGuard 组件 - 测试时卡在这里
export function AuthGuard({ children }: AuthGuardProps) {
const { isAuthenticated, loading } = useAuth()

if (loading) {
return <Spinner /> // 永远显示 spinner
}

if (!isAuthenticated) {
return <Navigate to="/login" />
}

return <>{children}</>
}

测试代码尝试模拟登录,但 Supabase Auth SDK 内部状态无法通过简单的 API mock 控制。

根因

1. Supabase Auth 是异步初始化的

useAuth hook 在 useEffect 中调用 supabase.auth.getSession(),这是异步操作。测试环境下网络请求可能失败或超时,导致状态永远停留在 loading: true

2. Zustand Store 的默认值问题

// authStore.ts - 问题代码
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: true, // 👈 默认值是 true
// ...
}),
{ name: 'auth-storage' }
)
)

测试启动时,loading: true + 异步初始化失败 = 永远 loading。

3. OAuth 流程无法自动化

即使能 mock API,OAuth 的重定向流程需要真实浏览器交互,E2E 测试无法可靠模拟。

解决方案

步骤 1:在 useAuth hook 中添加测试模式检测

// hooks/useAuth.ts
export function useAuth() {
const { user, token, loading, setUser, setToken, setLoading } = useAuthStore()

useEffect(() => {
const initAuth = async () => {
// 👇 优先检测测试模式
const testAuthUser = localStorage.getItem('test-auth-user')
const testAuthToken = localStorage.getItem('test-auth-token')

if (testAuthUser && testAuthToken) {
try {
const userData = JSON.parse(testAuthUser) as User
setUser(userData)
setToken(testAuthToken)
setLoading(false)
console.log('[useAuth] Using test mode auth')
return // 👈 直接返回,跳过 Supabase 初始化
} catch (e) {
console.error('Failed to parse test auth user:', e)
}
}

// 👇 正常模式:走 Supabase Auth
try {
const { data: { session } } = await supabase.auth.getSession()
if (session) {
setUser(session.user as User)
setToken(session.access_token)
}
} catch (error) {
console.error('Auth init failed:', error)
} finally {
setLoading(false)
}
}

initAuth()

// 👇 测试模式下跳过 auth state listener
if (localStorage.getItem('test-auth-user')) {
return
}

const { data: { subscription } } = supabase.auth.onAuthStateChange(
async (event, session) => {
// ... 正常的 auth state 处理
}
)

return () => subscription.unsubscribe()
}, [])
}

步骤 2:修改 Zustand Store 默认值

// stores/authStore.ts
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: false, // 👈 改为 false,让 useAuth hook 控制状态
setUser: (user) => set({ user }),
setToken: (token) => set({ token }),
setLoading: (loading) => set({ loading }),
logout: () => set({ user: null, token: null, loading: false }),
}),
{
name: 'auth-storage',
partialize: (state) => ({ user: state.user, token: state.token }),
}
)
)

步骤 3:在 Playwright Fixture 中注入测试认证

// e2e/fixtures.ts
import { test as base } from '@playwright/test'

export const mockUser = {
id: 'test-user-id',
email: 'test@example.com',
created_at: '2024-01-01T00:00:00Z',
}

export const test = base.extend({
authenticatedPage: async ({ page }, use) => {
// 先访问页面以设置 localStorage 的 origin
await page.goto('/login')

// 👇 注入测试认证状态到 localStorage
await page.evaluate(
({ user, token }) => {
localStorage.setItem('test-auth-user', JSON.stringify(user))
localStorage.setItem('test-auth-token', token)
},
{ user: mockUser, token: 'mock-access-token' }
)

// 导航到受保护页面,useAuth 会检测到测试模式
await page.goto('/dashboard')

await use(page)
},
})

步骤 4:在测试中使用

// e2e/dashboard.spec.ts
import { test, expect } from './fixtures'

test('dashboard shows user agents', async ({ authenticatedPage }) => {
// authenticatedPage 已经通过认证,无需登录
await expect(authenticatedPage.getByText('Test Agent')).toBeVisible()
})

完整代码结构

agent-frontend/
├── e2e/
│ ├── fixtures.ts # Playwright fixture + mock 数据
│ ├── dashboard.spec.ts # 测试用例
│ └── ...
├── src/
│ ├── hooks/
│ │ └── useAuth.ts # 测试模式检测
│ └── stores/
│ └── authStore.ts # loading: false 默认值
└── playwright.config.ts

关键要点

  1. 测试模式 key 使用特殊前缀test-auth-* 不会在生产环境中出现
  2. 检测优先于初始化:先检查 localStorage,再走 Supabase Auth
  3. 跳过 auth listener:测试模式下不需要监听 auth state 变化
  4. loading 默认值改为 false:让 hook 显式控制 loading 状态

对类似需求感兴趣?联系合作

修复 FastAPI SSE 客户端断开时的 CancelledError

· 阅读需 2 分钟

在为客户构建 AI 客服自动化系统时遇到此问题,记录根因与解法。

TL;DR

FastAPI 的 StreamingResponse 在客户端断开连接时会取消生成器任务,导致 asyncio.CancelledError。正确做法是在生成器中捕获该异常并 re-raise,否则会导致异常日志污染和资源泄漏。

问题现象

使用 SSE(Server-Sent Events)实现流式对话时,客户端断开连接后,服务端日志出现大量异常:

ERROR:    Exception in ASGI application
...
asyncio.CancelledError

代码原本写法:

async def event_stream():
async for event in engine.execute(body.message):
yield event

return StreamingResponse(event_stream(), media_type="text/event-stream")

根因

FastAPI/Starlette 的 StreamingResponse 在客户端断开时,会取消正在执行的生成器任务。Python 的 async for 循环被取消时会抛出 asyncio.CancelledError

如果不处理这个异常,它会向上传播,被 ASGI 服务器捕获并记录为错误日志。更严重的是,生成器内的资源(如数据库连接、HTTP 客户端)可能无法正确释放。

解决方案

在生成器内部捕获 CancelledError,记录日志后 必须 re-raise

import asyncio
import logging

logger = logging.getLogger(__name__)

async def event_stream():
try:
async for event in engine.execute(body.message):
yield event
except asyncio.CancelledError:
# 客户端断开连接,正常行为
logger.info("Client disconnected")
raise # 必须 re-raise 以正确终止生成器

return StreamingResponse(event_stream(), media_type="text/event-stream")

为什么必须 re-raise?

CancelledError 是 Python 取消协程的标准机制。捕获后如果不 re-raise:

  1. 生成器不会正确终止
  2. StreamingResponse 认为响应正常完成
  3. 可能导致资源泄漏

FAQ

Q: FastAPI SSE 客户端断开后为什么报 CancelledError?

A: 这是 Python asyncio 的设计行为。客户端断开时,Starlette 取消生成器任务,触发 CancelledError。正确处理方式是捕获并 re-raise。

Q: 捕获 CancelledError 后不 re-raise 会怎样?

A: 生成器无法正确终止,可能导致数据库连接、HTTP 客户端等资源泄漏。同时 StreamingResponse 会误认为响应正常完成。

Q: 如何区分正常断开和异常断开?

A: CancelledError 本身就是正常断开的信号。如果需要在断开时执行清理逻辑(如更新状态),在 except 块中处理后再 re-raise。

集成 Supabase Auth 到 FastAPI 的三个坑

· 阅读需 4 分钟

在为客户构建 SaaS 认证系统时遇到此问题,记录根因与解法。

TL;DR

Supabase Auth + FastAPI 集成有三个常见坑:JWKS 路径不是标准路径、ES256 签名需转换为 DER 格式、用户首次登录时本地数据库无记录。本文提供完整解决方案。

问题现象

坑 1:JWKS 路径 404

GET https://xxx.supabase.co/.well-known/jwks.json
# 404 Not Found

所有 JWT 验证请求返回 401 Invalid Token。

坑 2:ES256 签名验证失败

from jose import jwt
payload = jwt.decode(token, key, algorithms=["ES256"])
# JWTError: Signature verification failed

明明公钥是对的,但签名验证总是失败。

坑 3:用户首次登录无本地记录

# 创建 Agent 时
agent = Agent(user_id=current_user["user_id"], ...)
db.add(agent)
# ForeignKeyViolation: user_id 不存在

Supabase Auth 用户通过了 JWT 验证,但本地 agent_users 表没有该用户记录。

根因

坑 1:Supabase 非标准 JWKS 路径

标准 OAuth/OIDC 服务器 JWKS 在 /.well-known/jwks.json,但 Supabase 把认证服务放在 /auth/v1/ 子路径下:

标准路径Supabase 路径
/.well-known/jwks.json/auth/v1/.well-known/jwks.json

坑 2:ES256 原始签名 vs DER 格式

Supabase JWT 使用 ES256(P-256 曲线)签名。JWT 中的签名是 raw 格式r || s 拼接,64 字节),但 Python cryptography 库的 verify() 方法需要 DER-encoded ASN.1 格式

Raw:     r (32 bytes) || s (32 bytes) = 64 bytes
DER: 0x30 <len> 0x02 <r_len> <r> 0x02 <s_len> <s>

python-josejwt.decode() 在处理 ES256 时有兼容性问题,需要手动验证签名。

坑 3:认证与数据分离

Supabase Auth 是独立服务,用户注册/登录后只存在于 Supabase 的 auth.users 表。本地数据库的 agent_users 表需要手动同步。

解决方案

1. 正确的 JWKS URL

# config.py
class Settings(BaseSettings):
supabase_url: str = "https://xxx.supabase.co"

@property
def jwks_url(self) -> str:
# 关键:/auth/v1/ 前缀
return f"{self.supabase_url}/auth/v1/.well-known/jwks.json"

2. ES256 签名验证(完整代码)

import json
import base64
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature

def _base64url_decode(data: str) -> bytes:
"""Base64url 解码,自动补 padding"""
rem = len(data) % 4
if rem > 0:
data += "=" * (4 - rem)
return base64.urlsafe_b64decode(data)

def _raw_to_der_signature(raw_sig: bytes) -> bytes:
"""将 raw ECDSA 签名 (r||s) 转为 DER 格式"""
# P-256: r 和 s 各 32 字节
r = int.from_bytes(raw_sig[:32], "big")
s = int.from_bytes(raw_sig[32:], "big")
return encode_dss_signature(r, s)

def verify_es256_signature(token: str, public_key_jwk: dict) -> dict:
"""验证 ES256 JWT 签名,返回 payload"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")

header_b64, payload_b64, signature_b64 = parts

# 1. 构建 EC 公钥
x = _base64url_decode(public_key_jwk["x"])
y = _base64url_decode(public_key_jwk["y"])
x_int = int.from_bytes(x, "big")
y_int = int.from_bytes(y, "big")

public_key = ec.EllipticCurvePublicNumbers(
x_int, y_int, ec.SECP256R1()
).public_key(default_backend())

# 2. 验证签名
message = f"{header_b64}.{payload_b64}".encode()
raw_signature = _base64url_decode(signature_b64)
der_signature = _raw_to_der_signature(raw_signature)

public_key.verify(
der_signature,
message,
ec.ECDSA(hashes.SHA256())
)

# 3. 返回 payload
return json.loads(_base64url_decode(payload_b64))

3. 用户同步服务

# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import AgentUser

async def ensure_user_exists(
db: AsyncSession,
user_id: str,
email: str,
plan: str = "free"
) -> AgentUser:
"""确保用户存在于本地数据库(从 Supabase Auth 同步)"""
# 检查是否存在
result = await db.execute(
select(AgentUser).where(AgentUser.user_id == user_id)
)
user = result.scalar_one_or_none()

if user:
return user

# 创建新用户
user = AgentUser(
user_id=user_id,
email=email,
plan=plan,
role="user"
)
db.add(user)
await db.commit()
await db.refresh(user)
return user

4. 在创建资源前调用

# app/routers/agents.py
@router.post("/")
async def create_agent(
input: CreateAgentInput,
db: AsyncSession = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
# 关键:确保用户存在
user = await ensure_user_exists(
db,
user_id=current_user["user_id"],
email=current_user["email"],
plan=current_user["plan"]
)

# 现在可以安全创建 Agent
agent = Agent(
user_id=user.user_id,
name=input.name,
llm_config=input.llm_config.model_dump()
)
...

FAQ

Q: Supabase JWT 验证返回 404 怎么办?

A: Supabase 的 JWKS 路径是 /auth/v1/.well-known/jwks.json,不是标准的 /.well-known/jwks.json。检查你的 JWKS URL 配置。

Q: python-jose 验证 ES256 签名失败怎么解决?

A: python-jose 对 ES256 支持不完善。使用 cryptography 库手动验证,需要将 JWT 的 raw 签名(r||s 64字节)转换为 DER 格式。

Q: FastAPI 如何同步 Supabase Auth 用户到本地数据库?

A: 在需要用户记录的 API(如创建资源)入口处调用 ensure_user_exists(),从 JWT 提取用户信息并同步到本地表。

Q: Supabase JWT 中的 user_id 在哪个字段?

A: sub 字段包含用户 UUID,email 字段包含邮箱,app_metadata.plan 包含订阅计划(自定义字段)。