MCP实践案例

通过完整的实践案例,深入理解MCP协议的应用场景和最佳实践。 本文将介绍文件系统、数据库、GitHub集成等常见场景的MCP实现。

预计阅读时间:60分钟·难度:高级·包含完整代码示例

实践案例概述

本章将通过多个实际案例,展示如何使用MCP协议构建完整的AI应用集成方案。 每个案例都包含完整的代码和详细的实现说明。

案例列表

  • 文件系统服务器:安全访问本地文件
  • 数据库服务器:执行SQL查询
  • GitHub集成:操作GitHub仓库
  • 网页搜索工具:集成搜索引擎
  • 多服务器集成:整合多个MCP服务器

文件系统服务器

文件系统服务器是最常见的MCP服务器类型,允许AI安全地访问和操作本地文件。

完整实现

# filesystem_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent
import os
import json
from pathlib import Path

class FileSystemServer:
    def __init__(self, allowed_dirs: list[str]):
        self.allowed_dirs = [Path(d).resolve() for d in allowed_dirs]
        self.server = Server("filesystem")
        self._setup_handlers()
    
    def _is_path_allowed(self, path: Path) -> bool:
        """检查路径是否在允许的目录内"""
        resolved = path.resolve()
        return any(
            str(resolved).startswith(str(allowed))
            for allowed in self.allowed_dirs
        )
    
    def _setup_handlers(self):
        @self.server.list_resources()
        async def list_resources():
            resources = []
            for allowed_dir in self.allowed_dirs:
                for root, _, files in os.walk(allowed_dir):
                    for file in files:
                        file_path = Path(root) / file
                        if self._is_path_allowed(file_path):
                            resources.append(Resource(
                                uri=f"file://{file_path}",
                                name=file,
                                description=f"文件: {file_path.relative_to(allowed_dir)}",
                                mimeType=self._get_mime_type(file)
                            ))
            return resources
        
        @self.server.read_resource()
        async def read_resource(uri: str):
            path = Path(uri.replace("file://", ""))
            if not self._is_path_allowed(path):
                raise PermissionError(f"无权访问: {uri}")
            
            with open(path, 'r') as f:
                content = f.read()
            return content
        
        @self.server.list_tools()
        async def list_tools():
            return [
                Tool(
                    name="read_file",
                    description="读取文件内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string", "description": "文件路径"}
                        },
                        "required": ["path"]
                    }
                ),
                Tool(
                    name="write_file",
                    description="写入文件内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string"},
                            "content": {"type": "string"}
                        },
                        "required": ["path", "content"]
                    }
                ),
                Tool(
                    name="list_directory",
                    description="列出目录内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string"}
                        },
                        "required": ["path"]
                    }
                ),
                Tool(
                    name="search_files",
                    description="搜索文件",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "pattern": {"type": "string"},
                            "directory": {"type": "string"}
                        },
                        "required": ["pattern"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict):
            if name == "read_file":
                path = Path(arguments["path"])
                if not self._is_path_allowed(path):
                    return [TextContent(type="text", text="错误: 无权访问该文件")]
                with open(path, 'r') as f:
                    return [TextContent(type="text", text=f.read())]
            
            elif name == "write_file":
                path = Path(arguments["path"])
                if not self._is_path_allowed(path):
                    return [TextContent(type="text", text="错误: 无权写入该文件")]
                with open(path, 'w') as f:
                    f.write(arguments["content"])
                return [TextContent(type="text", text=f"文件已写入: {path}")]
            
            elif name == "list_directory":
                path = Path(arguments["path"])
                if not self._is_path_allowed(path):
                    return [TextContent(type="text", text="错误: 无权访问该目录")]
                items = list(path.iterdir())
                result = "\n".join([
                    f"{'[DIR] ' if i.is_dir() else '[FILE]'} {i.name}"
                    for i in items
                ])
                return [TextContent(type="text", text=result)]
            
            elif name == "search_files":
                import fnmatch
                pattern = arguments["pattern"]
                directory = Path(arguments.get("directory", "."))
                if not self._is_path_allowed(directory):
                    return [TextContent(type="text", text="错误: 无权访问该目录")]
                
                matches = []
                for root, _, files in os.walk(directory):
                    for file in files:
                        if fnmatch.fnmatch(file, pattern):
                            matches.append(str(Path(root) / file))
                
                return [TextContent(type="text", text="\n".join(matches))]
            
            return [TextContent(type="text", text="未知工具")]
    
    def _get_mime_type(self, filename: str) -> str:
        ext = Path(filename).suffix.lower()
        mimes = {
            '.txt': 'text/plain',
            '.md': 'text/markdown',
            '.json': 'application/json',
            '.py': 'text/x-python',
            '.js': 'text/javascript',
            '.ts': 'text/typescript',
            '.html': 'text/html',
            '.css': 'text/css'
        }
        return mimes.get(ext, 'application/octet-stream')
    
    async def run(self):
        async with stdio_server() as (read, write):
            await self.server.run(read, write, self.server.create_initialization_options())

# 启动服务器
if __name__ == "__main__":
    import asyncio
    server = FileSystemServer(
        allowed_dirs=["/workspace/projects", "/tmp"]
    )
    asyncio.run(server.run())

数据库服务器

PostgreSQL MCP服务器

# database_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent
import asyncpg
import json

class DatabaseServer:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.server = Server("postgres")
        self.pool = None
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.server.list_resources()
        async def list_resources():
            async with self.pool.acquire() as conn:
                tables = await conn.fetch("""
                    SELECT table_name 
                    FROM information_schema.tables 
                    WHERE table_schema = 'public'
                """)
                return [
                    Resource(
                        uri=f"postgres://table/{t['table_name']}",
                        name=t['table_name'],
                        description=f"数据库表: {t['table_name']}",
                        mimeType="application/json"
                    )
                    for t in tables
                ]
        
        @self.server.read_resource()
        async def read_resource(uri: str):
            if uri.startswith("postgres://table/"):
                table_name = uri.split("/")[-1]
                async with self.pool.acquire() as conn:
                    rows = await conn.fetch(f"SELECT * FROM {table_name} LIMIT 100")
                    return json.dumps([dict(r) for r in rows], default=str)
            raise ValueError(f"未知资源: {uri}")
        
        @self.server.list_tools()
        async def list_tools():
            return [
                Tool(
                    name="execute_query",
                    description="执行SQL查询(只读)",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "sql": {"type": "string", "description": "SELECT查询语句"}
                        },
                        "required": ["sql"]
                    }
                ),
                Tool(
                    name="describe_table",
                    description="获取表结构信息",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "table_name": {"type": "string"}
                        },
                        "required": ["table_name"]
                    }
                ),
                Tool(
                    name="get_schema",
                    description="获取数据库架构概览",
                    inputSchema={"type": "object", "properties": {}}
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict):
            async with self.pool.acquire() as conn:
                if name == "execute_query":
                    sql = arguments["sql"].strip()
                    # 安全检查:只允许SELECT
                    if not sql.upper().startswith("SELECT"):
                        return [TextContent(type="text", text="错误: 只允许SELECT查询")]
                    
                    try:
                        rows = await conn.fetch(sql)
                        result = [dict(r) for r in rows]
                        return [TextContent(type="text", text=json.dumps(result, default=str))]
                    except Exception as e:
                        return [TextContent(type="text", text=f"查询错误: {str(e)}")]
                
                elif name == "describe_table":
                    table_name = arguments["table_name"]
                    columns = await conn.fetch("""
                        SELECT column_name, data_type, is_nullable
                        FROM information_schema.columns
                        WHERE table_name = $1
                    """, table_name)
                    return [TextContent(type="text", text=json.dumps([dict(c) for c in columns]))]
                
                elif name == "get_schema":
                    tables = await conn.fetch("""
                        SELECT t.table_name, 
                               string_agg(c.column_name || ' (' || c.data_type || ')', ', ') as columns
                        FROM information_schema.tables t
                        LEFT JOIN information_schema.columns c ON t.table_name = c.table_name
                        WHERE t.table_schema = 'public'
                        GROUP BY t.table_name
                    """)
                    return [TextContent(type="text", text=json.dumps([dict(t) for t in tables]))]
    
    async def run(self):
        self.pool = await asyncpg.create_pool(self.connection_string)
        async with stdio_server() as (read, write):
            await self.server.run(read, write, self.server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    server = DatabaseServer("postgresql://user:pass@localhost/mydb")
    asyncio.run(server.run())

GitHub集成

GitHub MCP服务器核心实现

# github_server.py (简化版)
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx

class GitHubServer:
    def __init__(self, token: str):
        self.token = token
        self.server = Server("github")
        self.base_url = "https://api.github.com"
        self._setup_handlers()
    
    async def _github_request(self, method: str, path: str, **kwargs):
        async with httpx.AsyncClient() as client:
            headers = {
                "Authorization": f"Bearer {self.token}",
                "Accept": "application/vnd.github.v3+json"
            }
            url = f"{self.base_url}{path}"
            response = await client.request(method, url, headers=headers, **kwargs)
            return response.json()
    
    def _setup_handlers(self):
        @self.server.list_tools()
        async def list_tools():
            return [
                Tool(
                    name="search_repositories",
                    description="搜索GitHub仓库",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string"},
                            "limit": {"type": "number", "default": 10}
                        },
                        "required": ["query"]
                    }
                ),
                Tool(
                    name="get_repository",
                    description="获取仓库详情",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "owner": {"type": "string"},
                            "repo": {"type": "string"}
                        },
                        "required": ["owner", "repo"]
                    }
                ),
                Tool(
                    name="list_issues",
                    description="列出仓库Issues",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "owner": {"type": "string"},
                            "repo": {"type": "string"},
                            "state": {"type": "string", "enum": ["open", "closed", "all"]}
                        },
                        "required": ["owner", "repo"]
                    }
                ),
                Tool(
                    name="create_issue",
                    description="创建Issue",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "owner": {"type": "string"},
                            "repo": {"type": "string"},
                            "title": {"type": "string"},
                            "body": {"type": "string"}
                        },
                        "required": ["owner", "repo", "title"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict):
            import json
            
            if name == "search_repositories":
                result = await self._github_request(
                    "GET", "/search/repositories",
                    params={"q": arguments["query"], "per_page": arguments.get("limit", 10)}
                )
                repos = [{
                    "name": r["full_name"],
                    "stars": r["stargazers_count"],
                    "description": r["description"]
                } for r in result.get("items", [])]
                return [TextContent(type="text", text=json.dumps(repos))]
            
            elif name == "get_repository":
                result = await self._github_request(
                    "GET", f"/repos/{arguments['owner']}/{arguments['repo']}"
                )
                return [TextContent(type="text", text=json.dumps({
                    "name": result["full_name"],
                    "description": result["description"],
                    "stars": result["stargazers_count"],
                    "language": result["language"],
                    "topics": result.get("topics", [])
                }))]
            
            elif name == "list_issues":
                result = await self._github_request(
                    "GET", f"/repos/{arguments['owner']}/{arguments['repo']}/issues",
                    params={"state": arguments.get("state", "open")}
                )
                issues = [{
                    "number": i["number"],
                    "title": i["title"],
                    "state": i["state"]
                } for i in result]
                return [TextContent(type="text", text=json.dumps(issues))]
            
            elif name == "create_issue":
                result = await self._github_request(
                    "POST", f"/repos/{arguments['owner']}/{arguments['repo']}/issues",
                    json={
                        "title": arguments["title"],
                        "body": arguments.get("body", "")
                    }
                )
                return [TextContent(type="text", text=json.dumps({
                    "number": result["number"],
                    "url": result["html_url"]
                }))]

多服务器集成

集成多个MCP服务器

# multi_client.py
from mcp import ClientSession
from mcp.client.stdio import stdio_client

class MultiServerClient:
    def __init__(self):
        self.servers = {}
    
    async def connect_all(self, config: dict):
        """连接配置中的所有服务器"""
        for name, server_config in config.items():
            session = await self.connect_server(name, server_config)
            self.servers[name] = session
    
    async def connect_server(self, name: str, config: dict):
        """连接单个服务器"""
        transport = await stdio_client({
            "command": config["command"],
            "args": config.get("args", [])
        })
        session = ClientSession(*transport)
        await session.initialize()
        return session
    
    async def get_all_tools(self):
        """获取所有服务器的工具"""
        all_tools = {}
        for name, session in self.servers.items():
            tools = await session.list_tools()
            for tool in tools.tools:
                all_tools[f"{name}_{tool.name}"] = {
                    "server": name,
                    "tool": tool
                }
        return all_tools
    
    async def call_tool(self, full_name: str, arguments: dict):
        """调用工具(格式:server_toolname)"""
        server_name, tool_name = full_name.split("_", 1)
        session = self.servers.get(server_name)
        if session:
            return await session.call_tool(tool_name, arguments)
        raise ValueError(f"服务器不存在: {server_name}")

# 使用示例
async def main():
    client = MultiServerClient()
    
    config = {
        "fs": {"command": "python", "args": ["filesystem_server.py"]},
        "db": {"command": "python", "args": ["database_server.py"]},
        "github": {"command": "python", "args": ["github_server.py"]}
    }
    
    await client.connect_all(config)
    tools = await client.get_all_tools()
    print(f"可用工具: {list(tools.keys())}")

生产部署

部署检查清单

  • ☐ 安全配置:验证所有权限设置
  • ☐ 日志配置:设置适当的日志级别
  • ☐ 错误处理:确保所有异常被捕获
  • ☐ 性能优化:连接池、缓存等
  • ☐ 监控告警:设置健康检查
  • ☐ 备份恢复:数据和配置备份

Docker部署配置

# Dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD python -c "from mcp.server import health_check; health_check()" || exit 1

CMD ["python", "server.py"]

问题排查

连接失败

检查命令路径、环境变量、权限设置。

工具调用超时

增加超时时间,优化工具执行效率。

权限错误

检查allowed_dirs配置,确认路径正确。

调试技巧

使用MCP Inspector查看通信日志,添加详细的日志记录。

----