MCP Client开发
MCP Client负责连接MCP服务器,并使用其提供的资源、工具和提示模板。 本文将详细介绍如何开发MCP客户端,以及如何将其与LLM应用集成。
预计阅读时间:50分钟·难度:中级·SDK版本:Python 1.0+
MCP Client概述
MCP Client是连接AI应用与MCP服务器的桥梁。 作为Client开发者,你需要管理服务器连接、发现可用能力,并将这些能力整合到AI应用中。
Client的职责
- 连接管理:建立和维护与服务器的连接
- 能力发现:获取服务器支持的资源和工具
- 请求转发:将AI的请求转发到服务器
- 结果处理:处理服务器返回的结果
- 错误处理:优雅地处理连接和调用错误
典型架构
┌──────────────────────────────────────────────────────┐
│ AI Application │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Claude │ │ GPT-4 │ │ 其他LLM │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
└────────┼───────────────┼───────────────┼─────────────┘
│ │ │
└───────────────┼───────────────┘
│
┌─────┴─────┐
│ MCP Client│ ← 你开发的客户端
└─────┬─────┘
│
┌───────────────┼───────────────┐
│ │ │
┌────┴────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ Server 1│ │ Server 2 │ │ Server 3 │
└─────────┘ └───────────┘ └───────────┘连接服务器
Python客户端连接
from mcp import ClientSession
from mcp.client.stdio import stdio_client
async def connect_to_server():
# 启动服务器进程并建立连接
server_params = {
"command": "python",
"args": ["my_server.py"]
}
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化连接
await session.initialize()
# 使用服务器能力
resources = await session.list_resources()
tools = await session.list_tools()
# 进行操作...
result = await session.call_tool(
"my_tool",
{"arg": "value"}
)TypeScript客户端连接
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
async function connectToServer() {
const transport = new StdioClientTransport({
command: "python",
args: ["my_server.py"]
});
const client = new Client({
name: "my-client",
version: "1.0.0"
}, {
capabilities: {}
});
await client.connect(transport);
// 使用服务器能力
const resources = await client.request(
{ method: "resources/list" },
ListResourcesResultSchema
);
const tools = await client.request(
{ method: "tools/list" },
ListToolsResultSchema
);
}能力发现
连接建立后,Client需要了解Server支持哪些能力。 通过初始化响应和能力查询获取这些信息。
获取服务器能力
async def discover_capabilities(session: ClientSession):
"""发现服务器能力"""
# 初始化时获取服务器信息
init_result = await session.initialize()
print(f"服务器: {init_result.serverInfo.name}")
print(f"版本: {init_result.serverInfo.version}")
print(f"协议版本: {init_result.protocolVersion}")
# 检查支持的能力
capabilities = init_result.capabilities
# 资源能力
if capabilities.resources:
resources = await session.list_resources()
print(f"可用资源: {len(resources.resources)}个")
for r in resources.resources:
print(f" - {r.name}: {r.uri}")
# 工具能力
if capabilities.tools:
tools = await session.list_tools()
print(f"可用工具: {len(tools.tools)}个")
for t in tools.tools:
print(f" - {t.name}: {t.description}")
# 提示模板能力
if capabilities.prompts:
prompts = await session.list_prompts()
print(f"提示模板: {len(prompts.prompts)}个")
for p in prompts.prompts:
print(f" - {p.name}: {p.description}")使用资源
资源操作示例
async def use_resources(session: ClientSession):
"""使用服务器资源"""
# 1. 列出所有资源
resources = await session.list_resources()
# 2. 搜索特定资源
for resource in resources.resources:
if "config" in resource.name.lower():
print(f"找到配置资源: {resource.uri}")
# 3. 读取资源内容
if resources.resources:
first_resource = resources.resources[0]
content = await session.read_resource(first_resource.uri)
print(f"资源内容: {content.contents[0].text}")
# 4. 订阅资源更新(如果支持)
if session.server_capabilities.resources.subscribe:
await session.subscribe_resource(first_resource.uri)
# 监听更新通知
async for notification in session.resource_updates:
print(f"资源更新: {notification.uri}")调用工具
工具调用示例
async def use_tools(session: ClientSession):
"""使用服务器工具"""
# 1. 获取可用工具
tools = await session.list_tools()
# 2. 查找特定工具
tool_names = [t.name for t in tools.tools]
# 3. 调用工具
if "read_file" in tool_names:
result = await session.call_tool(
"read_file",
{"path": "/workspace/README.md"}
)
# 处理结果
for content in result.content:
if content.type == "text":
print(f"文件内容: {content.text}")
# 4. 错误处理
try:
result = await session.call_tool(
"run_command",
{"command": "ls -la"}
)
if result.isError:
print(f"工具执行失败: {result.content}")
else:
print(f"执行结果: {result.content}")
except Exception as e:
print(f"调用失败: {e}")使用提示模板
提示模板使用示例
async def use_prompts(session: ClientSession):
"""使用提示模板"""
# 1. 列出可用模板
prompts = await session.list_prompts()
# 2. 获取模板详情
for prompt in prompts.prompts:
print(f"模板: {prompt.name}")
if prompt.arguments:
print(f"参数: {[a.name for a in prompt.arguments]}")
# 3. 获取填充后的提示
result = await session.get_prompt(
"code_review",
{"language": "Python", "focus": "安全性"}
)
# 4. 使用提示
for message in result.messages:
if message.role == "user":
prompt_text = message.content.text
print(f"生成的提示: {prompt_text}")
# 可以直接发送给LLM与LLM集成
与Claude集成示例
import anthropic
from mcp import ClientSession
async def chat_with_tools(session: ClientSession, user_message: str):
"""结合MCP工具的聊天功能"""
# 获取可用工具
tools = await session.list_tools()
# 转换为Claude工具格式
claude_tools = [
{
"name": t.name,
"description": t.description,
"input_schema": t.inputSchema
}
for t in tools.tools
]
# 创建Claude客户端
client = anthropic.Anthropic()
# 发送消息
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[{"role": "user", "content": user_message}],
tools=claude_tools
)
# 处理工具调用
while response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
# 调用MCP工具
result = await session.call_tool(
block.name,
block.input
)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result.content[0].text
})
# 继续对话
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[
{"role": "user", "content": user_message},
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results}
],
tools=claude_tools
)
return response.content[0].text会话管理
会话生命周期管理
class MCPClientManager:
"""MCP客户端管理器"""
def __init__(self):
self.sessions: dict[str, ClientSession] = {}
async def connect_server(self, name: str, config: dict):
"""连接服务器"""
transport = await self._create_transport(config)
session = ClientSession(transport)
await session.initialize()
self.sessions[name] = session
return session
async def disconnect_server(self, name: str):
"""断开服务器"""
if name in self.sessions:
await self.sessions[name].close()
del self.sessions[name]
async def disconnect_all(self):
"""断开所有服务器"""
for name in list(self.sessions.keys()):
await self.disconnect_server(name)
async def get_all_tools(self):
"""获取所有服务器的工具"""
all_tools = []
for name, session in self.sessions.items():
tools = await session.list_tools()
for tool in tools.tools:
all_tools.append({
"server": name,
"tool": tool
})
return all_tools错误处理
健壮的错误处理
from mcp.types import McpError
async def safe_tool_call(session, tool_name, args, retries=3):
"""安全的工具调用"""
for attempt in range(retries):
try:
result = await session.call_tool(tool_name, args)
if result.isError:
# 工具执行错误
error_msg = result.content[0].text
return {"success": False, "error": error_msg}
return {"success": True, "data": result.content}
except McpError as e:
# MCP协议错误
if e.code == -32601: # 方法不存在
return {"success": False, "error": "工具不存在"}
elif e.code == -32602: # 无效参数
return {"success": False, "error": f"参数错误: {e.message}"}
elif attempt < retries - 1:
# 可重试错误
await asyncio.sleep(1)
continue
else:
return {"success": False, "error": str(e)}
except Exception as e:
# 其他错误
if attempt < retries - 1:
await asyncio.sleep(1)
continue
return {"success": False, "error": f"未知错误: {str(e)}"}最佳实践
1. 连接池管理
复用连接,避免频繁创建和销毁。
2. 超时设置
为所有操作设置合理的超时时间。
3. 缓存能力信息
缓存工具和资源列表,减少重复查询。
4. 用户确认
对于敏感操作,请求用户确认后再执行。
5. 日志记录
记录所有工具调用和结果,便于调试。
上一篇
← MCP Server下一篇
MCP实践案例 →