MCP实践案例
通过完整的实践案例,深入理解MCP协议的应用场景和最佳实践。 本文将介绍文件系统、数据库、GitHub集成等常见场景的MCP实现。
预计阅读时间:60分钟·难度:高级·包含完整代码示例
实践案例概述
本章将通过多个实际案例,展示如何使用MCP协议构建完整的AI应用集成方案。 每个案例都包含完整的代码和详细的实现说明。
案例列表
- 文件系统服务器:安全访问本地文件
- 数据库服务器:执行SQL查询
- GitHub集成:操作GitHub仓库
- 网页搜索工具:集成搜索引擎
- 多服务器集成:整合多个MCP服务器
文件系统服务器
文件系统服务器是最常见的MCP服务器类型,允许AI安全地访问和操作本地文件。
完整实现
# filesystem_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent
import os
import json
from pathlib import Path
class FileSystemServer:
def __init__(self, allowed_dirs: list[str]):
self.allowed_dirs = [Path(d).resolve() for d in allowed_dirs]
self.server = Server("filesystem")
self._setup_handlers()
def _is_path_allowed(self, path: Path) -> bool:
"""检查路径是否在允许的目录内"""
resolved = path.resolve()
return any(
str(resolved).startswith(str(allowed))
for allowed in self.allowed_dirs
)
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources():
resources = []
for allowed_dir in self.allowed_dirs:
for root, _, files in os.walk(allowed_dir):
for file in files:
file_path = Path(root) / file
if self._is_path_allowed(file_path):
resources.append(Resource(
uri=f"file://{file_path}",
name=file,
description=f"文件: {file_path.relative_to(allowed_dir)}",
mimeType=self._get_mime_type(file)
))
return resources
@self.server.read_resource()
async def read_resource(uri: str):
path = Path(uri.replace("file://", ""))
if not self._is_path_allowed(path):
raise PermissionError(f"无权访问: {uri}")
with open(path, 'r') as f:
content = f.read()
return content
@self.server.list_tools()
async def list_tools():
return [
Tool(
name="read_file",
description="读取文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="写入文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
),
Tool(
name="list_directory",
description="列出目录内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
Tool(
name="search_files",
description="搜索文件",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string"},
"directory": {"type": "string"}
},
"required": ["pattern"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "read_file":
path = Path(arguments["path"])
if not self._is_path_allowed(path):
return [TextContent(type="text", text="错误: 无权访问该文件")]
with open(path, 'r') as f:
return [TextContent(type="text", text=f.read())]
elif name == "write_file":
path = Path(arguments["path"])
if not self._is_path_allowed(path):
return [TextContent(type="text", text="错误: 无权写入该文件")]
with open(path, 'w') as f:
f.write(arguments["content"])
return [TextContent(type="text", text=f"文件已写入: {path}")]
elif name == "list_directory":
path = Path(arguments["path"])
if not self._is_path_allowed(path):
return [TextContent(type="text", text="错误: 无权访问该目录")]
items = list(path.iterdir())
result = "\n".join([
f"{'[DIR] ' if i.is_dir() else '[FILE]'} {i.name}"
for i in items
])
return [TextContent(type="text", text=result)]
elif name == "search_files":
import fnmatch
pattern = arguments["pattern"]
directory = Path(arguments.get("directory", "."))
if not self._is_path_allowed(directory):
return [TextContent(type="text", text="错误: 无权访问该目录")]
matches = []
for root, _, files in os.walk(directory):
for file in files:
if fnmatch.fnmatch(file, pattern):
matches.append(str(Path(root) / file))
return [TextContent(type="text", text="\n".join(matches))]
return [TextContent(type="text", text="未知工具")]
def _get_mime_type(self, filename: str) -> str:
ext = Path(filename).suffix.lower()
mimes = {
'.txt': 'text/plain',
'.md': 'text/markdown',
'.json': 'application/json',
'.py': 'text/x-python',
'.js': 'text/javascript',
'.ts': 'text/typescript',
'.html': 'text/html',
'.css': 'text/css'
}
return mimes.get(ext, 'application/octet-stream')
async def run(self):
async with stdio_server() as (read, write):
await self.server.run(read, write, self.server.create_initialization_options())
# 启动服务器
if __name__ == "__main__":
import asyncio
server = FileSystemServer(
allowed_dirs=["/workspace/projects", "/tmp"]
)
asyncio.run(server.run())数据库服务器
PostgreSQL MCP服务器
# database_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent
import asyncpg
import json
class DatabaseServer:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.server = Server("postgres")
self.pool = None
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources():
async with self.pool.acquire() as conn:
tables = await conn.fetch("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
return [
Resource(
uri=f"postgres://table/{t['table_name']}",
name=t['table_name'],
description=f"数据库表: {t['table_name']}",
mimeType="application/json"
)
for t in tables
]
@self.server.read_resource()
async def read_resource(uri: str):
if uri.startswith("postgres://table/"):
table_name = uri.split("/")[-1]
async with self.pool.acquire() as conn:
rows = await conn.fetch(f"SELECT * FROM {table_name} LIMIT 100")
return json.dumps([dict(r) for r in rows], default=str)
raise ValueError(f"未知资源: {uri}")
@self.server.list_tools()
async def list_tools():
return [
Tool(
name="execute_query",
description="执行SQL查询(只读)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SELECT查询语句"}
},
"required": ["sql"]
}
),
Tool(
name="describe_table",
description="获取表结构信息",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string"}
},
"required": ["table_name"]
}
),
Tool(
name="get_schema",
description="获取数据库架构概览",
inputSchema={"type": "object", "properties": {}}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
async with self.pool.acquire() as conn:
if name == "execute_query":
sql = arguments["sql"].strip()
# 安全检查:只允许SELECT
if not sql.upper().startswith("SELECT"):
return [TextContent(type="text", text="错误: 只允许SELECT查询")]
try:
rows = await conn.fetch(sql)
result = [dict(r) for r in rows]
return [TextContent(type="text", text=json.dumps(result, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"查询错误: {str(e)}")]
elif name == "describe_table":
table_name = arguments["table_name"]
columns = await conn.fetch("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = $1
""", table_name)
return [TextContent(type="text", text=json.dumps([dict(c) for c in columns]))]
elif name == "get_schema":
tables = await conn.fetch("""
SELECT t.table_name,
string_agg(c.column_name || ' (' || c.data_type || ')', ', ') as columns
FROM information_schema.tables t
LEFT JOIN information_schema.columns c ON t.table_name = c.table_name
WHERE t.table_schema = 'public'
GROUP BY t.table_name
""")
return [TextContent(type="text", text=json.dumps([dict(t) for t in tables]))]
async def run(self):
self.pool = await asyncpg.create_pool(self.connection_string)
async with stdio_server() as (read, write):
await self.server.run(read, write, self.server.create_initialization_options())
if __name__ == "__main__":
import asyncio
server = DatabaseServer("postgresql://user:pass@localhost/mydb")
asyncio.run(server.run())GitHub集成
GitHub MCP服务器核心实现
# github_server.py (简化版)
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx
class GitHubServer:
def __init__(self, token: str):
self.token = token
self.server = Server("github")
self.base_url = "https://api.github.com"
self._setup_handlers()
async def _github_request(self, method: str, path: str, **kwargs):
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json"
}
url = f"{self.base_url}{path}"
response = await client.request(method, url, headers=headers, **kwargs)
return response.json()
def _setup_handlers(self):
@self.server.list_tools()
async def list_tools():
return [
Tool(
name="search_repositories",
description="搜索GitHub仓库",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "number", "default": 10}
},
"required": ["query"]
}
),
Tool(
name="get_repository",
description="获取仓库详情",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"}
},
"required": ["owner", "repo"]
}
),
Tool(
name="list_issues",
description="列出仓库Issues",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"state": {"type": "string", "enum": ["open", "closed", "all"]}
},
"required": ["owner", "repo"]
}
),
Tool(
name="create_issue",
description="创建Issue",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"title": {"type": "string"},
"body": {"type": "string"}
},
"required": ["owner", "repo", "title"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
import json
if name == "search_repositories":
result = await self._github_request(
"GET", "/search/repositories",
params={"q": arguments["query"], "per_page": arguments.get("limit", 10)}
)
repos = [{
"name": r["full_name"],
"stars": r["stargazers_count"],
"description": r["description"]
} for r in result.get("items", [])]
return [TextContent(type="text", text=json.dumps(repos))]
elif name == "get_repository":
result = await self._github_request(
"GET", f"/repos/{arguments['owner']}/{arguments['repo']}"
)
return [TextContent(type="text", text=json.dumps({
"name": result["full_name"],
"description": result["description"],
"stars": result["stargazers_count"],
"language": result["language"],
"topics": result.get("topics", [])
}))]
elif name == "list_issues":
result = await self._github_request(
"GET", f"/repos/{arguments['owner']}/{arguments['repo']}/issues",
params={"state": arguments.get("state", "open")}
)
issues = [{
"number": i["number"],
"title": i["title"],
"state": i["state"]
} for i in result]
return [TextContent(type="text", text=json.dumps(issues))]
elif name == "create_issue":
result = await self._github_request(
"POST", f"/repos/{arguments['owner']}/{arguments['repo']}/issues",
json={
"title": arguments["title"],
"body": arguments.get("body", "")
}
)
return [TextContent(type="text", text=json.dumps({
"number": result["number"],
"url": result["html_url"]
}))]网页搜索工具
搜索集成服务器
# search_server.py
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx
from bs4 import BeautifulSoup
class SearchServer:
def __init__(self):
self.server = Server("web-search")
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_tools()
async def list_tools():
return [
Tool(
name="web_search",
description="网络搜索",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"num_results": {"type": "number", "default": 5}
},
"required": ["query"]
}
),
Tool(
name="fetch_page",
description="获取网页内容",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"}
},
"required": ["url"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "web_search":
# 使用DuckDuckGo或其他搜索API
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.duckduckgo.com/",
params={
"q": arguments["query"],
"format": "json",
"limit": arguments.get("num_results", 5)
}
)
results = response.json()
# 处理结果...
return [TextContent(type="text", text=str(results))]
elif name == "fetch_page":
async with httpx.AsyncClient() as client:
response = await client.get(arguments["url"])
soup = BeautifulSoup(response.text, 'html.parser')
# 提取文本
text = soup.get_text(separator='\n', strip=True)
return [TextContent(type="text", text=text[:5000])] # 限制长度多服务器集成
集成多个MCP服务器
# multi_client.py
from mcp import ClientSession
from mcp.client.stdio import stdio_client
class MultiServerClient:
def __init__(self):
self.servers = {}
async def connect_all(self, config: dict):
"""连接配置中的所有服务器"""
for name, server_config in config.items():
session = await self.connect_server(name, server_config)
self.servers[name] = session
async def connect_server(self, name: str, config: dict):
"""连接单个服务器"""
transport = await stdio_client({
"command": config["command"],
"args": config.get("args", [])
})
session = ClientSession(*transport)
await session.initialize()
return session
async def get_all_tools(self):
"""获取所有服务器的工具"""
all_tools = {}
for name, session in self.servers.items():
tools = await session.list_tools()
for tool in tools.tools:
all_tools[f"{name}_{tool.name}"] = {
"server": name,
"tool": tool
}
return all_tools
async def call_tool(self, full_name: str, arguments: dict):
"""调用工具(格式:server_toolname)"""
server_name, tool_name = full_name.split("_", 1)
session = self.servers.get(server_name)
if session:
return await session.call_tool(tool_name, arguments)
raise ValueError(f"服务器不存在: {server_name}")
# 使用示例
async def main():
client = MultiServerClient()
config = {
"fs": {"command": "python", "args": ["filesystem_server.py"]},
"db": {"command": "python", "args": ["database_server.py"]},
"github": {"command": "python", "args": ["github_server.py"]}
}
await client.connect_all(config)
tools = await client.get_all_tools()
print(f"可用工具: {list(tools.keys())}")生产部署
部署检查清单
- ☐ 安全配置:验证所有权限设置
- ☐ 日志配置:设置适当的日志级别
- ☐ 错误处理:确保所有异常被捕获
- ☐ 性能优化:连接池、缓存等
- ☐ 监控告警:设置健康检查
- ☐ 备份恢复:数据和配置备份
Docker部署配置
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD python -c "from mcp.server import health_check; health_check()" || exit 1
CMD ["python", "server.py"]问题排查
连接失败
检查命令路径、环境变量、权限设置。
工具调用超时
增加超时时间,优化工具执行效率。
权限错误
检查allowed_dirs配置,确认路径正确。
调试技巧
使用MCP Inspector查看通信日志,添加详细的日志记录。