1. 项目介绍 #
MySQL MCP 是一个基于 Model Context Protocol (MCP) 的智能数据库查询工具,它能够将自然语言查询转换为 SQL 语句并执行,让用户可以用日常语言与 MySQL 数据库进行交互。
1.1 核心特性 #
智能连接管理 - 支持多种数据库连接配置
自然语言查询 - 用中文描述需求,自动生成 SQL
AI 驱动 - 集成 OpenAI API,智能解析查询意图
MCP 协议 - 标准化的工具接口,易于集成
1.2 技术架构 #
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 自然语言查询 │ → │ AI 意图解析 │ → │ SQL 生成执行 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MCP 工具接口 │ │ LLM 客户端 │ │ MySQL 连接 │
└─────────────────┘ └─────────────────┘ └─────────────────┘1.3 主要功能 #
- 数据库连接 - 支持自定义主机、端口、用户认证
- 信息查询 - 获取数据库结构、表信息等元数据
- SQL 执行 - 直接执行 SQL 语句并返回结果
- 智能查询 - 自然语言转 SQL,支持复杂查询需求
1.4 项目目录结构 #
mysql-mcp/
├── 📄 __init__.py # 模块包初始化文件
├── 📄 config.py # 配置和常量管理
├── 📄 models.py # 数据模型和数据库表结构
├── 📄 llm_client.py # LLM客户端和API调用
├── 📄 database.py # 数据库连接管理
├── 📄 query_parser.py # 自然语言查询解析
├── 📄 sql_builder.py # SQL语句构建
├── 📄 mysql_server.py # MySQL MCP服务器核心逻辑
├── 📄 mcp_tools.py # MCP工具定义
├── 📄 main.py # 主程序入口
├── 📄 pyproject.toml # 项目配置
├── 📄 uv.lock # 依赖锁定文件
├── 📄 .env # 环境变量
└── 📄 .python-version # Python版本1.5. 模块依赖关系图 #

1.6. 模块职责分工 #
1.6.1. 基础设施层 #
config.py: 配置管理、环境变量、日志设置models.py: 数据模型、数据库表结构定义
1.6.2. 外部服务层 #
llm_client.py: LLM API调用、响应处理database.py: 数据库连接、SQL执行、结果处理
1.6.3. 业务逻辑层 #
query_parser.py: 自然语言查询意图识别sql_builder.py: SQL语句生成、错误分析mysql_server.py: 核心业务流程、模块协调
1.6.4. 接口层 #
mcp_tools.py: MCP工具函数封装main.py: 服务器启动、工具注册
1.7. 初始化项目 #
# 初始化项目
uv init mysql-mcp
# 安装依赖
uv add "mcp[cli]" openai sqlalchemy pymysql python-dotenv
# 启动MCP服务器
mcp dev main.py2. 启动MCP服务器 #
2.1. config.py #
config.py
# 导入logging模块,用于日志记录
import logging
# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
# 设置日志的基本配置,包括日志级别和输出格式
logging.basicConfig(
level=logging.INFO, # 设置日志级别为INFO
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # 设置日志输出格式
)
# 返回一个以当前模块名命名的logger实例
return logging.getLogger(__name__)
# 获取logger实例,供其他模块使用
logger = setup_logging()
2.2. main.py #
main.py
# 从mcp.server模块导入FastMCP类
+from mcp.server import FastMCP
# 从config模块导入logger日志记录器
+from config import logger
# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
+mcp = FastMCP("mysql-database")
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
+ try:
# 记录服务启动的日志信息
+ logger.info("MySQL MCP服务启动中...")
# 启动FastMCP服务器
+ mcp.run()
+ except KeyboardInterrupt:
# 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
+ logger.info("服务器已停止")
+ except Exception as e:
# 捕获其他异常,记录错误日志
+ logger.error(f"服务器运行时出错: {str(e)}")
# 导入traceback模块用于打印详细的异常信息
+ import traceback
# 打印异常的详细堆栈信息
+ traceback.print_exc()
2.3. pyproject.toml #
pyproject.toml
[project]
name = "mysql-mcp"
version = "0.1.0"
description = "MySQL MCP Server"
readme = "README.md"
requires-python = ">=3.12"
+dependencies = [
+ "mcp[cli]>=1.13.1",
+ "openai>=1.101.0",
+ "pymysql>=1.1.1",
+ "python-dotenv>=1.1.1",
+ "sqlalchemy>=2.0.43",
+]3. 连接MYSQL数据库 #
3.1. database.py #
database.py
# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional
# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text
# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
# 导入数据库连接数据类DatabaseConnection
from models import DatabaseConnection, DatabaseSchema
# 定义数据库连接管理器类
class DatabaseManager:
"""数据库连接管理器"""
# 构造方法,初始化数据库连接对象为None
def __init__(self):
# 数据库连接对象,初始为None
self.db_connection: Optional[DatabaseConnection] = None
self.schema = DatabaseSchema()
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
try:
# 如果已经存在数据库连接,则先关闭
if self.db_connection:
self.db_connection.close()
# 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
connection_string = (
f"mysql+pymysql://{config['user']}:{config['password']}@"
f"{config['host']}:{config['port']}/{config['database']}"
)
# 创建SQLAlchemy引擎对象
engine = create_engine(
connection_string,
echo=False, # 不输出SQL语句日志
pool_pre_ping=True, # 启用连接池健康检查
pool_recycle=3600, # 连接回收时间为3600秒
)
# 测试数据库连接,执行一条简单的SQL语句
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
# 保存数据库连接信息到db_connection属性
self.db_connection = DatabaseConnection(
host=config["host"],
port=config["port"],
user=config["user"],
password=config["password"],
database=config["database"],
engine=engine,
)
# 创建数据库表
await self.create_tables()
# 返回连接成功的信息,包括主机、端口和数据库名
return {
"success": True,
"message": f"成功连接到数据库 {config['database']}",
"connection_info": {
"host": config["host"],
"port": config["port"],
"database": config["database"],
},
}
# 捕获异常,记录错误日志并返回失败信息
except Exception as e:
# 记录数据库连接失败的错误日志
logger.error(f"数据库连接失败: {str(e)}")
# 返回失败信息
return {
"success": False,
"message": f"数据库连接失败: {str(e)}",
}
# 异步方法:创建数据库表
async def create_tables(self):
"""创建数据库表"""
if self.db_connection and self.db_connection.engine:
self.schema.get_metadata().create_all(self.db_connection.engine)
# 关闭数据库连接的方法
def close(self) -> None:
"""关闭数据库连接"""
# 如果存在数据库连接,则关闭并置为None
if self.db_connection:
self.db_connection.close()
self.db_connection = None
3.2. mcp_tools.py #
mcp_tools.py
# 导入json模块,用于处理JSON数据
import json
# 从config模块导入logger日志记录器
from config import logger
# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer
# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()
# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
# 返回MySQLMCPServer实例
"""获取MySQL服务器实例"""
return mysql_server
# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
try:
# 构造数据库连接配置字典
config = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
}
# 调用MySQLMCPServer的connect_database方法进行异步连接
result = await mysql_server.connect_database(config)
# 将连接结果转换为格式化的JSON字符串并返回
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
# 如果连接过程中发生异常,构造错误信息字典
error_result = {"success": False, "message": f"连接失败: {str(e)}"}
# 记录错误日志
logger.error(f"连接失败: {str(e)}")
# 返回错误信息的JSON字符串
return json.dumps(error_result, ensure_ascii=False, indent=2)
3.3. models.py #
models.py
# 导入类型注解Any和Optional,Any表示任意类型,Optional表示可选类型
from typing import Any, Optional
# 导入dataclass装饰器,用于简化数据类的定义
from dataclasses import dataclass
# 导入SQLAlchemy相关内容,用于定义数据库表结构
from sqlalchemy import (
MetaData, # 用于管理数据库元数据
Table, # 用于定义数据库表
Column, # 用于定义表的字段
Integer, # 整数类型
String, # 字符串类型
Date, # 日期类型
Enum, # 枚举类型
DECIMAL, # 定点数类型
TIMESTAMP, # 时间戳类型
)
# 导入datetime模块中的datetime类,用于处理日期和时间
from datetime import datetime
# 从config模块导入logger日志记录器
from config import logger
# 使用dataclass装饰器定义数据库连接信息的数据类
@dataclass
class DatabaseConnection:
"""数据库连接信息"""
# 主机名
host: str
# 端口号
port: int
# 用户名
user: str
# 密码
password: str
# 数据库名
database: str
# SQLAlchemy引擎对象,可选
engine: Optional[Any] = None
# 会话对象(暂未使用),可选
session: Optional[Any] = None
# 定义关闭数据库连接的方法
def close(self) -> None:
"""关闭数据库连接"""
# 如果engine存在,则释放数据库引擎资源
if self.engine:
try:
# 调用dispose方法释放资源
self.engine.dispose()
# 记录数据库连接已关闭的信息
logger.info("数据库连接已关闭")
except Exception as e:
# 记录关闭数据库连接失败的错误信息
logger.error(f"关闭数据库连接失败: {str(e)}")
# 定义数据库表结构管理类
class DatabaseSchema:
"""数据库表结构定义"""
# 构造方法,初始化元数据并设置表结构
def __init__(self):
# 创建SQLAlchemy元数据对象
self.metadata = MetaData()
# 调用私有方法设置数据库表结构
self._setup_tables()
# 私有方法,定义所有数据库表结构
def _setup_tables(self) -> None:
"""设置数据库表结构"""
# 定义学生表
Table(
"students", # 表名
self.metadata, # 关联的元数据对象
Column(
"student_id", Integer, primary_key=True, autoincrement=True
), # 学生ID,主键,自增
Column(
"student_no", String(20), nullable=False, unique=True
), # 学号,唯一且不能为空
Column("student_name", String(20), nullable=False), # 学生姓名,不能为空
Column("gender", Enum("男", "女"), default="男"), # 性别,枚举类型,默认男
Column("class_name", String(20), nullable=False), # 班级名称,不能为空
Column("birth_date", Date), # 出生日期
Column("parent_phone", String(15)), # 家长电话
Column(
"create_time", TIMESTAMP, default=datetime.now
), # 创建时间,默认当前时间
)
# 定义科目表
Table(
"subjects", # 表名
self.metadata, # 关联的元数据对象
Column(
"subject_id", Integer, primary_key=True, autoincrement=True
), # 科目ID,主键,自增
Column(
"subject_name", String(20), nullable=False, unique=True
), # 科目名称,唯一且不能为空
Column("teacher_name", String(20)), # 任课老师姓名
Column(
"create_time", TIMESTAMP, default=datetime.now
), # 创建时间,默认当前时间
)
# 定义成绩表
Table(
"scores", # 表名
self.metadata, # 关联的元数据对象
Column(
"score_id", Integer, primary_key=True, autoincrement=True
), # 成绩ID,主键,自增
Column("student_id", Integer, nullable=False), # 学生ID,不能为空
Column("subject_id", Integer, nullable=False), # 科目ID,不能为空
Column("exam_date", Date, nullable=False), # 考试日期,不能为空
Column(
"exam_type",
Enum("期中", "期末", "单元测试", "月考"),
default="期末", # 考试类型,枚举,默认期末
),
Column("score", DECIMAL(5, 2), nullable=False), # 分数,定点数,不能为空
Column(
"grade_level", Enum("优秀", "良好", "及格", "不及格")
), # 等级,枚举类型
Column(
"create_time", TIMESTAMP, default=datetime.now
), # 创建时间,默认当前时间
)
# 获取元数据对象的方法
def get_metadata(self):
"""获取元数据对象"""
return self.metadata
3.4. mysql_server.py #
mysql_server.py
# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict
# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager
# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
"""MySQL MCP 服务器"""
# 构造方法,初始化数据库连接管理器
def __init__(self):
# 创建一个DatabaseManager实例,管理数据库连接
self.db_manager = DatabaseManager()
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
# 调用数据库管理器的connect_database方法进行异步连接,并返回结果
return await self.db_manager.connect_database(config)
# 关闭服务器和数据库连接的方法
def close(self) -> None:
"""关闭服务器和数据库连接"""
# 调用数据库管理器的close方法,关闭数据库连接
self.db_manager.close()
3.5. config.py #
config.py
# 导入logging模块,用于日志记录
import logging
# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
# 设置日志的基本配置,包括日志级别和输出格式
logging.basicConfig(
# 设置日志级别为INFO
+ level=logging.INFO,
# 设置日志输出格式
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回一个以当前模块名命名的logger实例
return logging.getLogger(__name__)
# 获取logger实例,供其他模块使用
logger = setup_logging()
3.6. main.py #
main.py
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database
# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")
# 使用装饰器注册MCP工具
+@mcp.tool()
# 定义一个异步函数,用于连接数据库
+async def connect_database_tool(
# 数据库主机地址,默认为localhost
+ host: str = "localhost",
# 数据库端口号,默认为3306
+ port: int = 3306,
# 数据库用户名,默认为root
+ user: str = "root",
# 数据库密码,默认为123456
+ password: str = "123456",
# 要连接的数据库名称,默认为school
+ database: str = "school",
+) -> str:
# 函数文档字符串,说明该函数用于连接MySQL数据库
+ """连接MySQL数据库"""
# 调用connect_database函数,异步连接数据库,并返回结果
+ return await connect_database(host, port, user, password, database)
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
try:
# 记录服务启动的日志信息
logger.info("MySQL MCP服务启动中...")
# 启动FastMCP服务器
mcp.run()
except KeyboardInterrupt:
# 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
logger.info("服务器已停止")
except Exception as e:
# 捕获其他异常,记录错误日志
logger.error(f"服务器运行时出错: {str(e)}")
# 导入traceback模块用于打印详细的异常信息
import traceback
# 打印异常的详细堆栈信息
traceback.print_exc()
4. 查看数据库信息 #
4.1. database.py #
database.py
# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional
# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text
# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
from models import DatabaseConnection, DatabaseSchema
# 定义数据库连接管理器类
class DatabaseManager:
"""数据库连接管理器"""
# 构造方法,初始化数据库连接对象为None
def __init__(self):
# 数据库连接对象,初始为None
self.db_connection: Optional[DatabaseConnection] = None
self.schema = DatabaseSchema()
# 异步方法:创建数据库表
async def create_tables(self):
"""创建数据库表"""
if self.db_connection and self.db_connection.engine:
self.schema.get_metadata().create_all(self.db_connection.engine)
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
try:
# 如果已经存在数据库连接,则先关闭
if self.db_connection:
self.db_connection.close()
# 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
connection_string = (
f"mysql+pymysql://{config['user']}:{config['password']}@"
f"{config['host']}:{config['port']}/{config['database']}"
)
# 创建SQLAlchemy引擎对象
engine = create_engine(
connection_string,
echo=False, # 不输出SQL语句日志
pool_pre_ping=True, # 启用连接池健康检查
pool_recycle=3600, # 连接回收时间为3600秒
)
# 测试数据库连接,执行一条简单的SQL语句
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
# 保存数据库连接信息到db_connection属性
self.db_connection = DatabaseConnection(
host=config["host"],
port=config["port"],
user=config["user"],
password=config["password"],
database=config["database"],
engine=engine,
)
# 创建数据库表
await self.create_tables()
# 返回连接成功的信息,包括主机、端口和数据库名
return {
"success": True,
"message": f"成功连接到数据库 {config['database']}",
"connection_info": {
"host": config["host"],
"port": config["port"],
"database": config["database"],
},
}
# 捕获异常,记录错误日志并返回失败信息
except Exception as e:
# 记录数据库连接失败的错误日志
logger.error(f"数据库连接失败: {str(e)}")
# 返回失败信息
return {
"success": False,
"message": f"数据库连接失败: {str(e)}",
}
# 定义获取当前数据库连接信息的方法,返回一个字典
+ def get_connection_info(self) -> Dict[str, Any]:
# 方法文档字符串,说明功能
+ """获取当前数据库连接信息"""
# 判断是否存在数据库连接
+ if self.db_connection:
# 如果已连接,返回连接状态及相关信息
+ return {
+ "connected": True,
+ "host": self.db_connection.host, # 数据库主机地址
+ "port": self.db_connection.port, # 数据库端口号
+ "database": self.db_connection.database, # 数据库名称
+ "user": self.db_connection.user, # 数据库用户名
+ }
+ else:
# 如果未连接,返回未连接状态及提示信息
+ return {"connected": False, "message": "数据库未连接"}
# 关闭数据库连接的方法
def close(self) -> None:
"""关闭数据库连接"""
# 如果存在数据库连接,则关闭并置为None
if self.db_connection:
self.db_connection.close()
self.db_connection = None
4.2. main.py #
main.py
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database, get_database_info
# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")
# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 函数文档字符串,说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
# 调用connect_database函数,异步连接数据库,并返回结果
return await connect_database(host, port, user, password, database)
+@mcp.tool()
+async def get_database_info_tool() -> str:
+ """获取当前数据库连接信息"""
+ return await get_database_info()
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
try:
# 记录服务启动的日志信息
logger.info("MySQL MCP服务启动中...")
# 启动FastMCP服务器
mcp.run()
except KeyboardInterrupt:
# 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
logger.info("服务器已停止")
except Exception as e:
# 捕获其他异常,记录错误日志
logger.error(f"服务器运行时出错: {str(e)}")
# 导入traceback模块用于打印详细的异常信息
import traceback
# 打印异常的详细堆栈信息
traceback.print_exc()
4.3. mcp_tools.py #
mcp_tools.py
# 导入json模块,用于处理JSON数据
import json
# 从config模块导入logger日志记录器
from config import logger
# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer
# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()
# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
# 返回MySQLMCPServer实例
"""获取MySQL服务器实例"""
return mysql_server
# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
try:
# 构造数据库连接配置字典
config = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
}
# 调用MySQLMCPServer的connect_database方法进行异步连接
result = await mysql_server.connect_database(config)
# 将连接结果转换为格式化的JSON字符串并返回
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
# 如果连接过程中发生异常,构造错误信息字典
error_result = {"success": False, "message": f"连接失败: {str(e)}"}
# 记录错误日志
logger.error(f"连接失败: {str(e)}")
# 返回错误信息的JSON字符串
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义MCP工具:获取当前数据库连接信息
+async def get_database_info() -> str:
+ """获取当前数据库连接信息"""
+ try:
# 获取数据库连接信息
+ info = mysql_server.db_manager.get_connection_info()
# 返回信息的JSON字符串
+ return json.dumps(info, ensure_ascii=False, indent=2)
+ except Exception as e:
# 获取信息失败时返回错误信息
+ error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
+ return json.dumps(error_result, ensure_ascii=False, indent=2)
4.4. mysql_server.py #
mysql_server.py
# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict
# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager
# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
"""MySQL MCP 服务器"""
# 构造方法,初始化数据库连接管理器
def __init__(self):
# 创建一个DatabaseManager实例,管理数据库连接
self.db_manager = DatabaseManager()
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
# 调用数据库管理器的connect_database方法进行异步连接,并返回结果
return await self.db_manager.connect_database(config)
# 关闭服务器和数据库连接的方法
def close(self) -> None:
"""关闭服务器和数据库连接"""
# 调用数据库管理器的close方法,关闭数据库连接
self.db_manager.close()
5. 执行SQL语句 #
5.1. llm_client.py #
llm_client.py
# 导入类型注解相关内容
from typing import Optional
# 导入OpenAI库,用于调用LLM
from openai import OpenAI
# 导入配置,包括API密钥、基础URL和日志记录器
from config import OPENAI_API_KEY, OPENAI_BASE_URL, logger
# 创建OpenAI客户端实例,传入API密钥和基础URL
client = OpenAI(
api_key=OPENAI_API_KEY,
base_url=OPENAI_BASE_URL,
)
# 定义LLMClient类,用于封装LLM相关操作
class LLMClient:
"""LLM客户端类"""
# 静态方法:调用LLM API,返回响应内容
@staticmethod
def call_llm(query: str) -> Optional[str]:
"""调用LLM API,返回响应内容"""
try:
# 调用LLM的chat.completions.create方法,传入模型、消息和超时参数
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": query}],
timeout=30, # 添加超时设置
)
# 返回LLM的回复内容
return response.choices[0].message.content
except Exception as e:
# 记录错误日志
logger.error(f"LLM API调用失败: {str(e)}")
# 返回None表示调用失败
return None
5.2. sql_builder.py #
sql_builder.py
# 导入配置和LLM客户端
from config import logger
from llm_client import LLMClient
# 定义SQLBuilder类,用于构建SQL语句和分析SQL错误
class SQLBuilder:
# SQL语句构建器说明文档
"""SQL语句构建器"""
# 静态方法:分析SQL错误并给出建议
@staticmethod
def analyze_sql_error(error_message: str, sql: str) -> str:
# 方法说明文档
"""使用LLM分析SQL错误并给出建议"""
try:
# 构造LLM提示词,要求分析错误并给建议
prompt = f"""
请分析以下MySQL错误信息,并给出解决建议:
错误信息: {error_message}
SQL语句: {sql}
请给出:
1. 错误原因分析
2. 具体修复建议
3. 正确SQL示例(如有必要)
请用中文简明回答。
"""
# 调用LLM获取建议
suggestion = LLMClient.call_llm(prompt)
# 如果LLM返回建议,则去除首尾空白后返回
if suggestion:
return suggestion.strip()
# 如果没有建议,返回默认提示
return "无法分析错误,请检查SQL语法和数据库连接。"
# 捕获异常,分析失败时返回默认提示
except Exception as e:
# 记录警告日志
logger.warning(f"错误分析失败: {str(e)}")
# 返回默认提示
return "无法分析错误,请检查SQL语法和数据库连接。"
5.3. config.py #
config.py
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于操作系统相关功能
+import os
# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
# 设置日志的基本配置,包括日志级别和输出格式
logging.basicConfig(
# 设置日志级别为INFO
level=logging.INFO,
# 设置日志输出格式
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回一个以当前模块名命名的logger实例
return logging.getLogger(__name__)
# 获取logger实例,供其他模块使用
logger = setup_logging()
# 构建默认数据库连接配置,从环境变量获取各项参数,如果没有设置则使用默认值
+DEFAULT_DB_CONFIG = {
# 获取数据库主机地址,默认localhost
+ "host": os.getenv("DB_HOST", "localhost"),
# 获取数据库端口号,默认3306,并转换为整数类型
+ "port": int(os.getenv("DB_PORT", "3306")),
# 获取数据库用户名,默认root
+ "user": os.getenv("DB_USER", "root"),
# 获取数据库密码,默认123456
+ "password": os.getenv("DB_PASSWORD", "123456"),
# 获取数据库名称,默认school
+ "database": os.getenv("DB_NAME", "school"),
+}
# 从环境变量中获取OpenAI API密钥,如果没有设置则使用默认值
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-8c78c2e5832b4f3e84ee56b94460dfe9")
# 从环境变量中获取OpenAI Base URL,如果没有设置则使用默认值
+OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com")
5.4. database.py #
database.py
# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional
# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text
# 导入日期和时间相关类
+from datetime import datetime, date
# 导入Decimal类,用于高精度浮点数
+from decimal import Decimal
# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
from models import DatabaseConnection
# 定义数据库连接管理器类
class DatabaseManager:
"""数据库连接管理器"""
# 构造方法,初始化数据库连接对象为None
def __init__(self):
# 数据库连接对象,初始为None
self.db_connection: Optional[DatabaseConnection] = None
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
try:
# 如果已经存在数据库连接,则先关闭
if self.db_connection:
self.db_connection.close()
# 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
connection_string = (
f"mysql+pymysql://{config['user']}:{config['password']}@"
f"{config['host']}:{config['port']}/{config['database']}"
)
# 创建SQLAlchemy引擎对象
engine = create_engine(
connection_string,
echo=False, # 不输出SQL语句日志
pool_pre_ping=True, # 启用连接池健康检查
pool_recycle=3600, # 连接回收时间为3600秒
)
# 测试数据库连接,执行一条简单的SQL语句
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
# 保存数据库连接信息到db_connection属性
self.db_connection = DatabaseConnection(
host=config["host"],
port=config["port"],
user=config["user"],
password=config["password"],
database=config["database"],
engine=engine,
)
# 返回连接成功的信息,包括主机、端口和数据库名
return {
"success": True,
"message": f"成功连接到数据库 {config['database']}",
"connection_info": {
"host": config["host"],
"port": config["port"],
"database": config["database"],
},
}
# 捕获异常,记录错误日志并返回失败信息
except Exception as e:
# 记录数据库连接失败的错误日志
logger.error(f"数据库连接失败: {str(e)}")
# 返回失败信息
return {
"success": False,
"message": f"数据库连接失败: {str(e)}",
}
# 定义获取当前数据库连接信息的方法,返回一个字典
def get_connection_info(self) -> Dict[str, Any]:
# 方法文档字符串,说明功能
"""获取当前数据库连接信息"""
# 判断是否存在数据库连接
if self.db_connection:
# 如果已连接,返回连接状态及相关信息
return {
"connected": True,
"host": self.db_connection.host, # 数据库主机地址
"port": self.db_connection.port, # 数据库端口号
"database": self.db_connection.database, # 数据库名称
"user": self.db_connection.user, # 数据库用户名
}
else:
# 如果未连接,返回未连接状态及提示信息
return {"connected": False, "message": "数据库未连接"}
# 异步方法:执行SQL语句
+ async def execute_query(self, sql: str) -> Dict[str, Any]:
# 执行SQL语句
+ """执行SQL语句"""
# 如果未连接数据库,返回错误信息
+ if not self.db_connection or not self.db_connection.engine:
+ return {
+ "success": False,
+ "message": "数据库未连接,请先连接数据库。",
+ }
+ try:
# 获取数据库连接
+ with self.db_connection.engine.connect() as conn:
# 执行SQL语句
+ result = conn.execute(text(sql))
# 如果是SELECT查询
+ if sql.strip().upper().startswith("SELECT"):
# 获取所有结果行
+ rows = result.fetchall()
# 获取列名
+ columns = result.keys()
# 转换为可序列化格式
+ serializable_rows = []
+ for row in rows:
# 每一行转为字典
+ row_dict = {}
+ for i, col in enumerate(columns):
+ value = row[i]
# 日期类型转为字符串
+ if isinstance(value, (date, datetime)):
+ value = value.isoformat()
# Decimal类型转为float
+ elif isinstance(value, Decimal):
+ value = float(value)
# 赋值到字典
+ row_dict[col] = value
# 添加到结果列表
+ serializable_rows.append(row_dict)
# 返回查询结果
+ return {
+ "success": True,
+ "message": f"查询成功,共返回{len(serializable_rows)}条记录",
+ "data": serializable_rows,
+ "columns": list(columns),
+ "sql": sql,
+ }
+ else:
# 非查询操作,提交事务
+ conn.commit()
# 返回操作成功信息
+ return {
+ "success": True,
+ "message": "操作执行成功",
+ "affected_rows": result.rowcount,
+ "sql": sql,
+ }
# 捕获异常
+ except Exception as e:
# 记录SQL执行失败日志
+ logger.error(f"SQL执行失败: {str(e)}")
# 返回失败信息
+ return {
+ "success": False,
+ "message": f"SQL执行失败: {str(e)}",
+ "error": str(e),
+ }
# 关闭数据库连接的方法
def close(self) -> None:
"""关闭数据库连接"""
# 如果存在数据库连接,则关闭并置为None
if self.db_connection:
self.db_connection.close()
self.db_connection = None
5.5. main.py #
main.py
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database, get_database_info, execute_sql
# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")
# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 函数文档字符串,说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
# 调用connect_database函数,异步连接数据库,并返回结果
return await connect_database(host, port, user, password, database)
@mcp.tool()
async def get_database_info_tool() -> str:
"""获取当前数据库连接信息"""
return await get_database_info()
# 注册MCP工具:执行SQL语句
+@mcp.tool()
+async def execute_sql_tool(sql: str) -> str:
+ """直接执行SQL语句"""
# 调用execute_sql函数执行SQL语句
+ return await execute_sql(sql)
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
try:
# 记录服务启动的日志信息
logger.info("MySQL MCP服务启动中...")
# 启动FastMCP服务器
mcp.run()
except KeyboardInterrupt:
# 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
logger.info("服务器已停止")
except Exception as e:
# 捕获其他异常,记录错误日志
logger.error(f"服务器运行时出错: {str(e)}")
# 导入traceback模块用于打印详细的异常信息
import traceback
# 打印异常的详细堆栈信息
traceback.print_exc()
5.6. mcp_tools.py #
mcp_tools.py
# 导入json模块,用于处理JSON数据
import json
# 从config模块导入logger日志记录器
from config import logger
# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer
# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()
# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
# 返回MySQLMCPServer实例
"""获取MySQL服务器实例"""
return mysql_server
# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
try:
# 构造数据库连接配置字典
config = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
}
# 调用MySQLMCPServer的connect_database方法进行异步连接
result = await mysql_server.connect_database(config)
# 将连接结果转换为格式化的JSON字符串并返回
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
# 如果连接过程中发生异常,构造错误信息字典
error_result = {"success": False, "message": f"连接失败: {str(e)}"}
# 记录错误日志
logger.error(f"连接失败: {str(e)}")
# 返回错误信息的JSON字符串
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义MCP工具:获取当前数据库连接信息
async def get_database_info() -> str:
"""获取当前数据库连接信息"""
try:
# 获取数据库连接信息
info = mysql_server.db_manager.get_connection_info()
# 返回信息的JSON字符串
return json.dumps(info, ensure_ascii=False, indent=2)
except Exception as e:
# 获取信息失败时返回错误信息
error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义异步函数:直接执行SQL语句
+async def execute_sql(sql: str) -> str:
# 函数文档字符串,说明功能
+ """直接执行SQL语句"""
+ try:
# 调用MySQLMCPServer的execute_query方法执行SQL
+ result = await mysql_server.execute_query(sql)
# 将结果转换为JSON字符串并返回
+ return json.dumps(result, ensure_ascii=False, indent=2)
+ except Exception as e:
# 捕获异常,构造错误信息字典
+ error_result = {"success": False, "message": f"SQL执行失败: {str(e)}"}
# 返回错误信息的JSON字符串
+ return json.dumps(error_result, ensure_ascii=False, indent=2)
5.7. mysql_server.py #
mysql_server.py
# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict
# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager
+from config import DEFAULT_DB_CONFIG
+from sql_builder import SQLBuilder
# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
"""MySQL MCP 服务器"""
# 构造方法,初始化数据库连接管理器
def __init__(self):
# 创建一个DatabaseManager实例,管理数据库连接
self.db_manager = DatabaseManager()
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
# 调用数据库管理器的connect_database方法进行异步连接,并返回结果
return await self.db_manager.connect_database(config)
# 异步方法:执行SQL语句
+ async def execute_query(self, sql: str) -> Dict[str, Any]:
+ """执行SQL语句"""
# 如果当前没有数据库连接,则先连接默认数据库
+ if not self.db_manager.db_connection:
+ await self.db_manager.connect_database(DEFAULT_DB_CONFIG)
# 调用数据库管理器执行SQL查询
+ result = await self.db_manager.execute_query(sql)
# 如果查询失败且包含错误信息,则用LLM分析SQL错误
+ if not result.get("success") and "error" in result:
+ error_analysis = SQLBuilder.analyze_sql_error(result["error"], sql)
# 将错误分析结果添加到返回结果中
+ result["error_analysis"] = error_analysis
# 返回查询结果
+ return result
# 关闭服务器和数据库连接的方法
def close(self) -> None:
"""关闭服务器和数据库连接"""
# 调用数据库管理器的close方法,关闭数据库连接
self.db_manager.close()
6. 自然语言查询 #
6.1. .env #
.env
OPENAI_API_KEY=sk-e9fc8cde99dc47f8970d514dd7d941f2
OPENAI_BASE_URL=https://api.deepseek.com
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=123456
DB_NAME=school6.2. query_parser.py #
query_parser.py
# 导入Any, Dict, Optional用于类型注解
from typing import Any, Dict, Optional
# 导入日志记录器logger和LLM客户端
from config import logger
from llm_client import LLMClient
# 定义自然语言查询解析器类
class QueryParser:
# 类文档字符串,说明该类用于自然语言查询解析
"""自然语言查询解析器"""
# 静态方法,解析自然语言查询
@staticmethod
def parse_natural_language(query: str) -> Optional[Dict[str, Any]]:
# 方法文档字符串,说明功能
"""解析自然语言查询"""
try:
# 构造LLM提示词,要求分析中文查询并返回结构化JSON
prompt = f"""
请分析以下中文查询,识别查询意图、涉及的表、条件和操作类型。
查询内容: {query}
请以JSON格式返回结果,包含以下字段:
- operation: 操作类型 (SELECT/INSERT/UPDATE/DELETE)
- tables: 涉及的表名列表 (students/subjects/scores)
- conditions: 查询条件列表,每个条件包含 (字段, 操作符, 值)
- limit: 返回记录数上限(可选)
- order_by: 排序字段(可选)
示例输出:
{{
"operation": "SELECT",
"tables": ["students", "scores"],
"conditions": [["gender", "=", "男"], ["score", ">=", "80"]],
"limit": 10,
"order_by": "score DESC"
}}
"""
# 调用LLM客户端,传入prompt,获取解析结果
parsed_result = LLMClient.call_llm_with_json(prompt)
# 如果LLM返回结果为None,说明解析失败,返回默认解析结果
if parsed_result is None:
logger.warning("LLM返回格式无效,使用默认解析")
return {
"operation": "SELECT",
"tables": ["students"],
"conditions": [],
"limit": None,
"order_by": None,
"original_query": query,
}
# 正常返回解析结果,若某字段缺失则使用默认值
return {
"operation": parsed_result.get("operation", "SELECT"),
"tables": parsed_result.get(
"tables", ["students", "subjects", "scores"]
),
"conditions": parsed_result.get("conditions", []),
"limit": parsed_result.get("limit"),
"order_by": parsed_result.get("order_by"),
"original_query": query,
}
# 捕获异常,解析失败时返回默认解析结果
except Exception as e:
logger.warning(f"LLM解析失败,使用默认解析: {str(e)}")
return {
"operation": "SELECT",
"tables": ["students"],
"conditions": [],
"limit": None,
"order_by": None,
"original_query": query,
}
6.3. config.py #
config.py
# 导入logging模块,用于日志记录
import logging
# 导入os模块,用于操作系统相关功能
import os
+from dotenv import load_dotenv
# 加载 .env 文件中的环境变量
+load_dotenv()
# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
# 设置日志的基本配置,包括日志级别和输出格式
logging.basicConfig(
# 设置日志级别为INFO
level=logging.INFO,
# 设置日志输出格式
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 返回一个以当前模块名命名的logger实例
return logging.getLogger(__name__)
# 获取logger实例,供其他模块使用
logger = setup_logging()
# 构建默认数据库连接配置,从环境变量获取各项参数,如果没有设置则使用默认值
DEFAULT_DB_CONFIG = {
# 获取数据库主机地址,默认localhost
"host": os.getenv("DB_HOST", "localhost"),
# 获取数据库端口号,默认3306,并转换为整数类型
"port": int(os.getenv("DB_PORT", "3306")),
# 获取数据库用户名,默认root
"user": os.getenv("DB_USER", "root"),
# 获取数据库密码,默认123456
"password": os.getenv("DB_PASSWORD", "123456"),
# 获取数据库名称,默认school
"database": os.getenv("DB_NAME", "school"),
}
# 从环境变量中获取OpenAI API密钥,如果没有设置则使用默认值
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-e9fc8cde99dc47f8970d514dd7d941f2")
# 从环境变量中获取OpenAI Base URL,如果没有设置则使用默认值
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com")
6.4. llm_client.py #
llm_client.py
# 导入类型注解相关内容
+from typing import Any, Dict, Optional
# 导入json模块,用于处理JSON数据
+import json
# 导入OpenAI库,用于调用LLM
from openai import OpenAI
# 导入配置,包括API密钥、基础URL和日志记录器
from config import OPENAI_API_KEY, OPENAI_BASE_URL, logger
# 创建OpenAI客户端实例,传入API密钥和基础URL
client = OpenAI(
+ api_key="sk-e9fc8cde99dc47f8970d514dd7d941f2",
base_url=OPENAI_BASE_URL,
)
# 定义LLMClient类,用于封装LLM相关操作
class LLMClient:
"""LLM客户端类"""
# 静态方法:调用LLM API,返回响应内容
@staticmethod
def call_llm(query: str) -> Optional[str]:
"""调用LLM API,返回响应内容"""
try:
# 调用LLM的chat.completions.create方法,传入模型、消息和超时参数
response = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": query}],
timeout=30, # 添加超时设置
)
# 返回LLM的回复内容
return response.choices[0].message.content
except Exception as e:
# 记录错误日志
logger.error(f"LLM API调用失败: {str(e)}")
# 返回None表示调用失败
return None
# 静态方法:调用LLM并解析JSON响应
+ @staticmethod
+ def call_llm_with_json(query: str) -> Optional[Dict[str, Any]]:
+ """调用LLM并解析JSON响应"""
# 调用LLM获取内容
+ content = LLMClient.call_llm(query)
# 如果内容为空,返回None
+ if not content:
+ return None
+ try:
# 如果内容中包含```json代码块,提取其中的JSON部分
+ if "```json" in content:
+ content = content.split("```json")[1].split("```")[0].strip()
+ return json.loads(content)
# 如果内容中包含```代码块,提取其中的JSON部分
+ elif "```" in content:
+ content = content.split("```")[1].split("```")[0].strip()
+ return json.loads(content)
# 否则返回None
+ return None
+ except json.JSONDecodeError as e:
# 记录JSON解析失败日志
+ logger.error(f"JSON解析失败: {str(e)}")
# 返回None表示解析失败
+ return None
# 静态方法:调用LLM并提取SQL语句
+ @staticmethod
+ def call_llm_with_sql(query: str) -> Optional[str]:
+ """调用LLM并提取SQL语句"""
# 调用LLM获取内容
+ content = LLMClient.call_llm(query)
# 如果内容为空,返回None
+ if not content:
+ return None
+ try:
# 如果内容中包含```sql代码块,提取其中的SQL部分
+ if "```sql" in content:
+ content = content.split("```sql")[1].split("```")[0].strip()
+ return content
# 如果内容中包含```代码块,提取其中的SQL部分
+ elif "```" in content:
+ content = content.split("```")[1].split("```")[0].strip()
+ return content
+ else:
# 否则直接返回内容
+ return content
+ except Exception as e:
# 记录SQL提取失败日志
+ logger.error(f"SQL提取失败: {str(e)}")
# 返回None表示提取失败
+ return None
6.5. main.py #
main.py
# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP
# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import (
+ connect_database,
+ get_database_info,
+ execute_sql,
+ natural_language_query,
+)
# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")
# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 函数文档字符串,说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
# 调用connect_database函数,异步连接数据库,并返回结果
return await connect_database(host, port, user, password, database)
@mcp.tool()
async def get_database_info_tool() -> str:
"""获取当前数据库连接信息"""
return await get_database_info()
# 注册MCP工具:执行SQL语句
@mcp.tool()
async def execute_sql_tool(sql: str) -> str:
"""直接执行SQL语句"""
# 调用execute_sql函数执行SQL语句
return await execute_sql(sql)
# 注册MCP工具:自然语言查询数据库
+@mcp.tool()
+async def natural_language_query_tool(query: str) -> str:
+ """使用自然语言查询MySQL数据库"""
# 调用natural_language_query函数执行自然语言查询
+ return await natural_language_query(query)
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
try:
# 记录服务启动的日志信息
logger.info("MySQL MCP服务启动中...")
# 启动FastMCP服务器
mcp.run()
except KeyboardInterrupt:
# 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
logger.info("服务器已停止")
except Exception as e:
# 捕获其他异常,记录错误日志
logger.error(f"服务器运行时出错: {str(e)}")
# 导入traceback模块用于打印详细的异常信息
import traceback
# 打印异常的详细堆栈信息
traceback.print_exc()
6.6. mcp_tools.py #
mcp_tools.py
# 导入json模块,用于处理JSON数据
import json
# 从config模块导入logger日志记录器
from config import logger
# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer
# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()
# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
# 返回MySQLMCPServer实例
"""获取MySQL服务器实例"""
return mysql_server
# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
# 数据库主机地址,默认为localhost
host: str = "localhost",
# 数据库端口号,默认为3306
port: int = 3306,
# 数据库用户名,默认为root
user: str = "root",
# 数据库密码,默认为123456
password: str = "123456",
# 要连接的数据库名称,默认为school
database: str = "school",
) -> str:
# 说明该函数用于连接MySQL数据库
"""连接MySQL数据库"""
try:
# 构造数据库连接配置字典
config = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
}
# 调用MySQLMCPServer的connect_database方法进行异步连接
result = await mysql_server.connect_database(config)
# 将连接结果转换为格式化的JSON字符串并返回
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
# 如果连接过程中发生异常,构造错误信息字典
error_result = {"success": False, "message": f"连接失败: {str(e)}"}
# 记录错误日志
logger.error(f"连接失败: {str(e)}")
# 返回错误信息的JSON字符串
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义MCP工具:获取当前数据库连接信息
async def get_database_info() -> str:
"""获取当前数据库连接信息"""
try:
# 获取数据库连接信息
info = mysql_server.db_manager.get_connection_info()
# 返回信息的JSON字符串
return json.dumps(info, ensure_ascii=False, indent=2)
except Exception as e:
# 获取信息失败时返回错误信息
error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义异步函数:直接执行SQL语句
async def execute_sql(sql: str) -> str:
# 函数文档字符串,说明功能
"""直接执行SQL语句"""
try:
# 调用MySQLMCPServer的execute_query方法执行SQL
result = await mysql_server.execute_query(sql)
# 将结果转换为JSON字符串并返回
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
# 捕获异常,构造错误信息字典
error_result = {"success": False, "message": f"SQL执行失败: {str(e)}"}
# 返回错误信息的JSON字符串
return json.dumps(error_result, ensure_ascii=False, indent=2)
# 定义异步函数:自然语言查询数据库
+async def natural_language_query(query: str) -> str:
# 函数文档字符串,说明功能
+ """使用自然语言查询MySQL数据库"""
# 记录查询日志
+ logger.info(f"MySQL数据库自然语言查询: {query}")
+ try:
# 调用MySQLMCPServer的natural_language_query方法执行查询
+ result = await mysql_server.natural_language_query(query)
# 记录查询结果日志
+ logger.info(f"自然语言查询结果: {result}")
# 将结果转换为JSON字符串并返回
+ return json.dumps(result, ensure_ascii=False, indent=2)
+ except Exception as e:
# 捕获异常,构造错误信息字典
+ error_result = {"success": False, "message": f"查询失败: {str(e)}"}
# 返回错误信息的JSON字符串
+ return json.dumps(error_result, ensure_ascii=False, indent=2)
6.7. mysql_server.py #
mysql_server.py
# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict
# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager
# 导入logger日志记录器和默认数据库配置
+from config import logger, DEFAULT_DB_CONFIG
# 导入SQL语句构建器
from sql_builder import SQLBuilder
# 导入自然语言查询解析器
+from query_parser import QueryParser
# 导入LLM客户端
+from llm_client import LLMClient
# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
"""MySQL MCP 服务器"""
# 构造方法,初始化数据库连接管理器
def __init__(self):
# 创建一个DatabaseManager实例,管理数据库连接
self.db_manager = DatabaseManager()
# 异步方法,用于连接数据库
async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""连接数据库"""
# 调用数据库管理器的connect_database方法进行异步连接,并返回结果
return await self.db_manager.connect_database(config)
# 异步方法:执行SQL语句
async def execute_query(self, sql: str) -> Dict[str, Any]:
"""执行SQL语句"""
# 如果当前没有数据库连接,则先连接默认数据库
if not self.db_manager.db_connection:
await self.db_manager.connect_database(DEFAULT_DB_CONFIG)
# 调用数据库管理器执行SQL查询
result = await self.db_manager.execute_query(sql)
# 如果查询失败且包含错误信息,则用LLM分析SQL错误
if not result.get("success") and "error" in result:
error_analysis = SQLBuilder.analyze_sql_error(result["error"], sql)
# 将错误分析结果添加到返回结果中
result["error_analysis"] = error_analysis
# 返回查询结果
return result
# 异步方法:处理自然语言查询
+ async def natural_language_query(self, query: str) -> Dict[str, Any]:
+ """自然语言查询处理"""
+ try:
# 使用QueryParser解析自然语言查询
+ parsed = QueryParser.parse_natural_language(query)
# 如果解析失败,返回错误信息
+ if not parsed:
+ return {
+ "success": False,
+ "message": "无法解析查询意图",
+ "original_query": query,
+ }
# 记录解析结果日志
+ logger.info(f"解析结果: {parsed}")
# 使用SQLBuilder根据解析结果生成SQL语句
+ sql = SQLBuilder.build_sql_query(parsed)
# 如果SQL生成失败,返回错误信息
+ if not sql:
+ return {
+ "success": False,
+ "message": "无法生成SQL语句",
+ "original_query": query,
+ "parsed_intent": parsed,
+ }
# 记录生成的SQL日志
+ logger.info(f"生成SQL: {sql}")
# 执行SQL查询
+ result = await self.execute_query(sql)
# 记录查询结果日志
+ logger.info(f"查询结果: {result}")
# 如果查询成功且有数据,生成结果解释
+ if result.get("success") and "data" in result:
+ result["explanation"] = self._explain_query_result(
+ query, result, parsed
+ )
# 返回完整的查询流程结果
+ return {
+ "success": True,
+ "original_query": query,
+ "parsed_intent": parsed,
+ "generated_sql": sql,
+ "execution_result": result,
+ }
# 捕获异常,记录错误日志并返回错误信息
+ except Exception as e:
+ logger.error(f"自然语言查询处理失败: {str(e)}")
+ return {
+ "success": False,
+ "message": f"自然语言查询处理失败: {str(e)}",
+ "original_query": query,
+ }
# 私有方法:使用LLM对查询结果进行解释
+ def _explain_query_result(
+ self, original_query: str, result: Dict[str, Any], parsed: Dict[str, Any]
+ ) -> str:
+ """使用LLM对查询结果进行解释"""
+ try:
# 初始化数据摘要
+ data_summary = ""
# 如果有查询数据
+ if result.get("data"):
# 统计数据条数
+ data_count = len(result["data"])
+ if data_count > 0:
# 取前3条数据作为示例
+ sample_data = result["data"][:3]
# 生成数据摘要字符串
+ data_summary = f"共查询到{data_count}条记录,示例数据: {json.dumps(sample_data, ensure_ascii=False)}"
+ else:
# 没有数据时的摘要
+ data_summary = "未查询到任何记录"
# 构造LLM提示词,要求简要解释查询结果
+ prompt = f"""
+ 请根据以下信息,用简明中文解释查询结果:
+ 原始查询: {original_query}
+ 查询意图: {parsed.get('operation', 'SELECT')} 操作,涉及表: {', '.join(parsed.get('tables', []))}
+ 查询结果: {data_summary}
+ 请给出:
+ 1. 查询结果的简要总结
+ 2. 若结果为空,可能的原因
+ 3. 如有需要,给出进一步查询建议
+ 答案不超过100字。
+ """
# 调用LLMClient获取解释内容
+ explanation = LLMClient.call_llm(prompt)
# 如果LLM返回了解释,则去除首尾空白后返回
+ if explanation:
+ return explanation.strip()
# 如果没有解释,返回默认提示
+ return "查询已完成,请查看数据。"
# 捕获异常,记录警告日志并返回默认提示
+ except Exception as e:
+ logger.warning(f"结果解释失败: {str(e)}")
+ return "查询已完成,请查看数据。"
# 关闭服务器和数据库连接的方法
def close(self) -> None:
"""关闭服务器和数据库连接"""
# 调用数据库管理器的close方法,关闭数据库连接
self.db_manager.close()
6.8. sql_builder.py #
sql_builder.py
# 导入配置和LLM客户端
from config import logger
# 导入json模块,用于处理JSON数据
+import json
# 导入类型注解相关内容
+from typing import Any, Dict, Optional
from llm_client import LLMClient
# 定义SQLBuilder类,用于构建SQL语句和分析SQL错误
class SQLBuilder:
# SQL语句构建器说明文档
"""SQL语句构建器"""
# 静态方法:分析SQL错误并给出建议
@staticmethod
def analyze_sql_error(error_message: str, sql: str) -> str:
# 方法说明文档
"""使用LLM分析SQL错误并给出建议"""
try:
# 构造LLM提示词,要求分析错误并给建议
prompt = f"""
请分析以下MySQL错误信息,并给出解决建议:
错误信息: {error_message}
SQL语句: {sql}
请给出:
1. 错误原因分析
2. 具体修复建议
3. 正确SQL示例(如有必要)
请用中文简明回答。
"""
# 调用LLM获取建议
suggestion = LLMClient.call_llm(prompt)
# 如果LLM返回建议,则去除首尾空白后返回
if suggestion:
return suggestion.strip()
# 如果没有建议,返回默认提示
return "无法分析错误,请检查SQL语法和数据库连接。"
# 捕获异常,分析失败时返回默认提示
except Exception as e:
# 记录警告日志
logger.warning(f"错误分析失败: {str(e)}")
# 返回默认提示
return "无法分析错误,请检查SQL语法和数据库连接。"
# 静态方法:根据解析结果构建SQL查询语句
+ @staticmethod
+ def build_sql_query(parsed: Dict[str, Any]) -> Optional[str]:
# 方法说明文档
+ """构建SQL查询语句"""
+ try:
# 构造LLM提示词,要求根据解析结果生成SQL
+ prompt = f"""
+ 请根据以下解析结果生成MySQL SQL语句:
+ 解析结果: {json.dumps(parsed, ensure_ascii=False)}
+ 数据库表结构:
+ - students: student_id(PK), student_no, student_name, gender, class_name, birth_date, parent_phone, create_time
+ - subjects: subject_id(PK), subject_name, teacher_name, create_time
+ - scores: score_id(PK), student_id, subject_id, exam_date, exam_type, score, grade_level, create_time
+ 要求:
+ 1. 如果是SELECT查询,选择合适字段,避免SELECT *
+ 2. 多表查询时使用合适的JOIN
+ 3. 添加合适的WHERE条件
+ 4. 有order_by时加ORDER BY
+ 5. 有limit时加LIMIT
+ 6. 只返回完整SQL语句,不要有解释
+ 示例: SELECT s.student_name, s.class_name, sc.score FROM students s JOIN scores sc ON s.student_id = sc.student_id WHERE sc.score >= 80 ORDER BY sc.score DESC LIMIT 10
+ """
# 调用LLM获取SQL语句
+ sql = LLMClient.call_llm_with_sql(prompt)
# 如果生成失败,返回默认SQL
+ if sql is None:
+ logger.warning("LLM生成SQL失败,使用默认SQL")
+ return "SELECT * FROM students LIMIT 10"
# 返回生成的SQL语句
+ return sql
# 捕获异常,生成SQL失败时返回默认SQL
+ except Exception as e:
# 生成SQL失败时记录警告日志
+ logger.warning(f"LLM生成SQL失败,使用默认SQL: {str(e)}")
# 返回默认SQL
+ return "SELECT * FROM students LIMIT 10"
7. @dataclass #
@dataclass 是 Python 3.7+ 引入的一个装饰器,它位于 dataclasses 模块中。它的主要作用是自动生成数据类的样板代码,大大简化了数据类的定义。
@dataclass 被用来装饰 DatabaseConnection 类:
@dataclass
class DatabaseConnection:
"""数据库连接信息"""
host: str
port: int
user: str
password: str
database: str
engine: Optional[Any] = None
session: Optional[Any] = None7.1. 自动生成 __init__ 方法 #
# 使用 @dataclass 后,Python 自动生成:
def __init__(self, host: str, port: int, user: str, password: str,
database: str, engine: Optional[Any] = None,
session: Optional[Any] = None):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = engine
self.session = session7.2. 自动生成 __repr__ 方法 #
# 自动生成字符串表示,便于调试:
def __repr__(self):
return f"DatabaseConnection(host='{self.host}', port={self.port}, ...)"7.3. 自动生成 __eq__ 方法 #
# 自动生成相等性比较方法:
def __eq__(self, other):
if not isinstance(other, DatabaseConnection):
return False
return (self.host == other.host and
self.port == other.port and
self.user == other.user and
# ... 其他字段比较)7.4. 不使用 @dataclass 的传统写法 #
如果不使用 @dataclass,您需要手动编写这些方法:
class DatabaseConnection:
def __init__(self, host: str, port: int, user: str, password: str,
database: str, engine: Optional[Any] = None,
session: Optional[Any] = None):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = engine
self.session = session
def __repr__(self):
return f"DatabaseConnection(host='{self.host}', port={self.port}, ...)"
def __eq__(self, other):
# 复杂的相等性比较逻辑
pass7.5. @dataclass 的优势 #
- 代码简洁性:减少了大量样板代码
- 类型安全:支持类型注解,便于IDE提供智能提示
- 自动生成:自动生成常用的魔术方法
- 可定制性:可以通过参数控制生成哪些方法
- 维护性:修改字段时,相关方法会自动更新