MCP Client开发

MCP Client负责连接MCP服务器,并使用其提供的资源、工具和提示模板。 本文将详细介绍如何开发MCP客户端,以及如何将其与LLM应用集成。

预计阅读时间:50分钟·难度:中级·SDK版本:Python 1.0+

MCP Client概述

MCP Client是连接AI应用与MCP服务器的桥梁。 作为Client开发者,你需要管理服务器连接、发现可用能力,并将这些能力整合到AI应用中。

Client的职责

  • 连接管理:建立和维护与服务器的连接
  • 能力发现:获取服务器支持的资源和工具
  • 请求转发:将AI的请求转发到服务器
  • 结果处理:处理服务器返回的结果
  • 错误处理:优雅地处理连接和调用错误

典型架构

┌──────────────────────────────────────────────────────┐
│                    AI Application                     │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐     │
│  │   Claude   │  │  GPT-4     │  │  其他LLM    │     │
│  └─────┬──────┘  └─────┬──────┘  └─────┬──────┘     │
└────────┼───────────────┼───────────────┼─────────────┘
         │               │               │
         └───────────────┼───────────────┘
                         │
                   ┌─────┴─────┐
                   │ MCP Client│  ← 你开发的客户端
                   └─────┬─────┘
                         │
         ┌───────────────┼───────────────┐
         │               │               │
    ┌────┴────┐    ┌─────┴─────┐   ┌─────┴─────┐
    │ Server 1│    │ Server 2  │   │ Server 3  │
    └─────────┘    └───────────┘   └───────────┘

连接服务器

Python客户端连接

from mcp import ClientSession
from mcp.client.stdio import stdio_client

async def connect_to_server():
    # 启动服务器进程并建立连接
    server_params = {
        "command": "python",
        "args": ["my_server.py"]
    }
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 初始化连接
            await session.initialize()
            
            # 使用服务器能力
            resources = await session.list_resources()
            tools = await session.list_tools()
            
            # 进行操作...
            result = await session.call_tool(
                "my_tool",
                {"arg": "value"}
            )

TypeScript客户端连接

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function connectToServer() {
  const transport = new StdioClientTransport({
    command: "python",
    args: ["my_server.py"]
  });
  
  const client = new Client({
    name: "my-client",
    version: "1.0.0"
  }, {
    capabilities: {}
  });
  
  await client.connect(transport);
  
  // 使用服务器能力
  const resources = await client.request(
    { method: "resources/list" },
    ListResourcesResultSchema
  );
  
  const tools = await client.request(
    { method: "tools/list" },
    ListToolsResultSchema
  );
}

能力发现

连接建立后,Client需要了解Server支持哪些能力。 通过初始化响应和能力查询获取这些信息。

获取服务器能力

async def discover_capabilities(session: ClientSession):
    """发现服务器能力"""
    
    # 初始化时获取服务器信息
    init_result = await session.initialize()
    print(f"服务器: {init_result.serverInfo.name}")
    print(f"版本: {init_result.serverInfo.version}")
    print(f"协议版本: {init_result.protocolVersion}")
    
    # 检查支持的能力
    capabilities = init_result.capabilities
    
    # 资源能力
    if capabilities.resources:
        resources = await session.list_resources()
        print(f"可用资源: {len(resources.resources)}个")
        for r in resources.resources:
            print(f"  - {r.name}: {r.uri}")
    
    # 工具能力
    if capabilities.tools:
        tools = await session.list_tools()
        print(f"可用工具: {len(tools.tools)}个")
        for t in tools.tools:
            print(f"  - {t.name}: {t.description}")
    
    # 提示模板能力
    if capabilities.prompts:
        prompts = await session.list_prompts()
        print(f"提示模板: {len(prompts.prompts)}个")
        for p in prompts.prompts:
            print(f"  - {p.name}: {p.description}")

使用资源

资源操作示例

async def use_resources(session: ClientSession):
    """使用服务器资源"""
    
    # 1. 列出所有资源
    resources = await session.list_resources()
    
    # 2. 搜索特定资源
    for resource in resources.resources:
        if "config" in resource.name.lower():
            print(f"找到配置资源: {resource.uri}")
    
    # 3. 读取资源内容
    if resources.resources:
        first_resource = resources.resources[0]
        content = await session.read_resource(first_resource.uri)
        print(f"资源内容: {content.contents[0].text}")
    
    # 4. 订阅资源更新(如果支持)
    if session.server_capabilities.resources.subscribe:
        await session.subscribe_resource(first_resource.uri)
        # 监听更新通知
        async for notification in session.resource_updates:
            print(f"资源更新: {notification.uri}")

调用工具

工具调用示例

async def use_tools(session: ClientSession):
    """使用服务器工具"""
    
    # 1. 获取可用工具
    tools = await session.list_tools()
    
    # 2. 查找特定工具
    tool_names = [t.name for t in tools.tools]
    
    # 3. 调用工具
    if "read_file" in tool_names:
        result = await session.call_tool(
            "read_file",
            {"path": "/workspace/README.md"}
        )
        
        # 处理结果
        for content in result.content:
            if content.type == "text":
                print(f"文件内容: {content.text}")
    
    # 4. 错误处理
    try:
        result = await session.call_tool(
            "run_command",
            {"command": "ls -la"}
        )
        if result.isError:
            print(f"工具执行失败: {result.content}")
        else:
            print(f"执行结果: {result.content}")
    except Exception as e:
        print(f"调用失败: {e}")

使用提示模板

提示模板使用示例

async def use_prompts(session: ClientSession):
    """使用提示模板"""
    
    # 1. 列出可用模板
    prompts = await session.list_prompts()
    
    # 2. 获取模板详情
    for prompt in prompts.prompts:
        print(f"模板: {prompt.name}")
        if prompt.arguments:
            print(f"参数: {[a.name for a in prompt.arguments]}")
    
    # 3. 获取填充后的提示
    result = await session.get_prompt(
        "code_review",
        {"language": "Python", "focus": "安全性"}
    )
    
    # 4. 使用提示
    for message in result.messages:
        if message.role == "user":
            prompt_text = message.content.text
            print(f"生成的提示: {prompt_text}")
            # 可以直接发送给LLM

与LLM集成

与Claude集成示例

import anthropic
from mcp import ClientSession

async def chat_with_tools(session: ClientSession, user_message: str):
    """结合MCP工具的聊天功能"""
    
    # 获取可用工具
    tools = await session.list_tools()
    
    # 转换为Claude工具格式
    claude_tools = [
        {
            "name": t.name,
            "description": t.description,
            "input_schema": t.inputSchema
        }
        for t in tools.tools
    ]
    
    # 创建Claude客户端
    client = anthropic.Anthropic()
    
    # 发送消息
    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=4096,
        messages=[{"role": "user", "content": user_message}],
        tools=claude_tools
    )
    
    # 处理工具调用
    while response.stop_reason == "tool_use":
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                # 调用MCP工具
                result = await session.call_tool(
                    block.name,
                    block.input
                )
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result.content[0].text
                })
        
        # 继续对话
        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            messages=[
                {"role": "user", "content": user_message},
                {"role": "assistant", "content": response.content},
                {"role": "user", "content": tool_results}
            ],
            tools=claude_tools
        )
    
    return response.content[0].text

会话管理

会话生命周期管理

class MCPClientManager:
    """MCP客户端管理器"""
    
    def __init__(self):
        self.sessions: dict[str, ClientSession] = {}
    
    async def connect_server(self, name: str, config: dict):
        """连接服务器"""
        transport = await self._create_transport(config)
        session = ClientSession(transport)
        await session.initialize()
        self.sessions[name] = session
        return session
    
    async def disconnect_server(self, name: str):
        """断开服务器"""
        if name in self.sessions:
            await self.sessions[name].close()
            del self.sessions[name]
    
    async def disconnect_all(self):
        """断开所有服务器"""
        for name in list(self.sessions.keys()):
            await self.disconnect_server(name)
    
    async def get_all_tools(self):
        """获取所有服务器的工具"""
        all_tools = []
        for name, session in self.sessions.items():
            tools = await session.list_tools()
            for tool in tools.tools:
                all_tools.append({
                    "server": name,
                    "tool": tool
                })
        return all_tools

错误处理

健壮的错误处理

from mcp.types import McpError

async def safe_tool_call(session, tool_name, args, retries=3):
    """安全的工具调用"""
    for attempt in range(retries):
        try:
            result = await session.call_tool(tool_name, args)
            
            if result.isError:
                # 工具执行错误
                error_msg = result.content[0].text
                return {"success": False, "error": error_msg}
            
            return {"success": True, "data": result.content}
            
        except McpError as e:
            # MCP协议错误
            if e.code == -32601:  # 方法不存在
                return {"success": False, "error": "工具不存在"}
            elif e.code == -32602:  # 无效参数
                return {"success": False, "error": f"参数错误: {e.message}"}
            elif attempt < retries - 1:
                # 可重试错误
                await asyncio.sleep(1)
                continue
            else:
                return {"success": False, "error": str(e)}
                
        except Exception as e:
            # 其他错误
            if attempt < retries - 1:
                await asyncio.sleep(1)
                continue
            return {"success": False, "error": f"未知错误: {str(e)}"}

最佳实践

1. 连接池管理

复用连接,避免频繁创建和销毁。

2. 超时设置

为所有操作设置合理的超时时间。

3. 缓存能力信息

缓存工具和资源列表,减少重复查询。

4. 用户确认

对于敏感操作,请求用户确认后再执行。

5. 日志记录

记录所有工具调用和结果,便于调试。

----