ai
  • index
  • 1.首页
  • 2.介绍
  • 3.架构概览
  • 4.服务器概念
  • 5.客户端概念
  • 6.版本控制
  • 7.连接到远程MCP服务器
  • 8.连接到本地MCP服务器
  • json_rpc
  • 9.构建一个MCP服务器
  • 10.检查员
  • 11.构建一个MCP客户端
  • 14.架构
  • 15.基础协议概述
  • 16.生命周期
  • 17.传输
  • 18.授权
  • 19.安全最佳实践
  • 20.取消
  • 21.Ping
  • 22.进展
  • 23.Roots
  • 24.采样
  • 25.启发
  • 26.服务器特性
  • 27.提示词
  • 28.资源
  • 29.工具
  • 30.完成
  • 31.日志记录
  • 32.分页
  • 33.架构参考
  • URI模板
  • 12.实现
  • http.server
  • 动态客户端注册协议
  • 受保护资源元数据
  • 授权服务器元数据
  • JWKS
  • PKCE
  • PyJWT
  • secrets
  • watchfiles
  • 实现authorization
  • 实现cancel
  • 实现completion
  • 实现logging
  • 实现pagination
  • 实现process
  • 实现transport
  • psutil
  • pytz
  • zoneinfo
  • contextlib
  • Starlette
  • mcp.1.starter
  • mcp.2.Resource
  • mcp.3.structured_output
  • mcp.4.prompts
  • mcp.5.context
  • mcp.6.streamable
  • mcp.7.lowlevel
  • mcp.8.Completion
  • mcp.9.Elicitation
  • mcp.10.oauth
  • mcp.11.integration
  • mcp.12.best
  • mysql-mcp
  • databases
  • uvicorn
  • asynccontextmanager
  • AsyncExitStack
  • streamable
  • aiohttp
  • publish
  • email
  • schedule
  • twine
  • 1.教学文档总览
  • 2.教师使用指南
  • 3.教学系统快速参考
  • 4.新生入门指南
  • 5.学生使用指南
  • 1. 项目介绍
    • 1.1 核心特性
    • 1.2 技术架构
    • 1.3 主要功能
    • 1.4 项目目录结构
    • 1.5. 模块依赖关系图
    • 1.6. 模块职责分工
      • 1.6.1. 基础设施层
      • 1.6.2. 外部服务层
      • 1.6.3. 业务逻辑层
      • 1.6.4. 接口层
    • 1.7. 初始化项目
  • 2. 启动MCP服务器
    • 2.1. config.py
    • 2.2. main.py
    • 2.3. pyproject.toml
  • 3. 连接MYSQL数据库
    • 3.1. database.py
    • 3.2. mcp_tools.py
    • 3.3. models.py
    • 3.4. mysql_server.py
    • 3.5. config.py
    • 3.6. main.py
  • 4. 查看数据库信息
    • 4.1. database.py
    • 4.2. main.py
    • 4.3. mcp_tools.py
    • 4.4. mysql_server.py
  • 5. 执行SQL语句
    • 5.1. llm_client.py
    • 5.2. sql_builder.py
    • 5.3. config.py
    • 5.4. database.py
    • 5.5. main.py
    • 5.6. mcp_tools.py
    • 5.7. mysql_server.py
  • 6. 自然语言查询
    • 6.1. .env
    • 6.2. query_parser.py
    • 6.3. config.py
    • 6.4. llm_client.py
    • 6.5. main.py
    • 6.6. mcp_tools.py
    • 6.7. mysql_server.py
    • 6.8. sql_builder.py
  • 7. @dataclass
    • 7.1. 自动生成 __init__ 方法
    • 7.2. 自动生成 __repr__ 方法
    • 7.3. 自动生成 __eq__ 方法
    • 7.4. 不使用 @dataclass 的传统写法
    • 7.5. @dataclass 的优势

1. 项目介绍 #

MySQL MCP 是一个基于 Model Context Protocol (MCP) 的智能数据库查询工具,它能够将自然语言查询转换为 SQL 语句并执行,让用户可以用日常语言与 MySQL 数据库进行交互。

  • school.sql

1.1 核心特性 #

智能连接管理 - 支持多种数据库连接配置
自然语言查询 - 用中文描述需求,自动生成 SQL
AI 驱动 - 集成 OpenAI API,智能解析查询意图
MCP 协议 - 标准化的工具接口,易于集成

1.2 技术架构 #

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   自然语言查询   │ →  │   AI 意图解析    │ →  │   SQL 生成执行   │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   MCP 工具接口   │    │   LLM 客户端    │    │   MySQL 连接    │
└─────────────────┘    └─────────────────┘    └─────────────────┘

1.3 主要功能 #

  • 数据库连接 - 支持自定义主机、端口、用户认证
  • 信息查询 - 获取数据库结构、表信息等元数据
  • SQL 执行 - 直接执行 SQL 语句并返回结果
  • 智能查询 - 自然语言转 SQL,支持复杂查询需求

1.4 项目目录结构 #

mysql-mcp/
├── 📄 __init__.py              #  模块包初始化文件
├── 📄 config.py                #  配置和常量管理
├── 📄 models.py                #  数据模型和数据库表结构
├── 📄 llm_client.py            #  LLM客户端和API调用
├── 📄 database.py              #  数据库连接管理
├── 📄 query_parser.py          #  自然语言查询解析
├── 📄 sql_builder.py           #  SQL语句构建
├── 📄 mysql_server.py          #  MySQL MCP服务器核心逻辑
├── 📄 mcp_tools.py             #  MCP工具定义
├── 📄 main.py                  #  主程序入口
├── 📄 pyproject.toml           #  项目配置
├── 📄 uv.lock                  #  依赖锁定文件
├── 📄 .env                     #  环境变量
└── 📄 .python-version          #  Python版本

1.5. 模块依赖关系图 #

1.6. 模块职责分工 #

1.6.1. 基础设施层 #

  • config.py: 配置管理、环境变量、日志设置
  • models.py: 数据模型、数据库表结构定义

1.6.2. 外部服务层 #

  • llm_client.py: LLM API调用、响应处理
  • database.py: 数据库连接、SQL执行、结果处理

1.6.3. 业务逻辑层 #

  • query_parser.py: 自然语言查询意图识别
  • sql_builder.py: SQL语句生成、错误分析
  • mysql_server.py: 核心业务流程、模块协调

1.6.4. 接口层 #

  • mcp_tools.py: MCP工具函数封装
  • main.py: 服务器启动、工具注册

1.7. 初始化项目 #

# 初始化项目
uv init mysql-mcp
# 安装依赖
uv add "mcp[cli]" openai  sqlalchemy  pymysql python-dotenv
# 启动MCP服务器
mcp dev main.py

2. 启动MCP服务器 #

2.1. config.py #

config.py

# 导入logging模块,用于日志记录
import logging

# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
    # 设置日志的基本配置,包括日志级别和输出格式
    logging.basicConfig(
        level=logging.INFO,  # 设置日志级别为INFO
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",  # 设置日志输出格式
    )
    # 返回一个以当前模块名命名的logger实例
    return logging.getLogger(__name__)

# 获取logger实例,供其他模块使用
logger = setup_logging()

2.2. main.py #

main.py

# 从mcp.server模块导入FastMCP类
+from mcp.server import FastMCP

# 从config模块导入logger日志记录器
+from config import logger

# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
+mcp = FastMCP("mysql-database")

# 判断当前模块是否为主程序入口
if __name__ == "__main__":
+   try:
        # 记录服务启动的日志信息
+       logger.info("MySQL MCP服务启动中...")
        # 启动FastMCP服务器
+       mcp.run()
+   except KeyboardInterrupt:
        # 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
+       logger.info("服务器已停止")
+   except Exception as e:
        # 捕获其他异常,记录错误日志
+       logger.error(f"服务器运行时出错: {str(e)}")
        # 导入traceback模块用于打印详细的异常信息
+       import traceback

        # 打印异常的详细堆栈信息
+       traceback.print_exc()

2.3. pyproject.toml #

pyproject.toml

[project]
name = "mysql-mcp"
version = "0.1.0"
description = "MySQL MCP Server"
readme = "README.md"
requires-python = ">=3.12"
+dependencies = [
+   "mcp[cli]>=1.13.1",
+   "openai>=1.101.0",
+   "pymysql>=1.1.1",
+   "python-dotenv>=1.1.1",
+   "sqlalchemy>=2.0.43",
+]

3. 连接MYSQL数据库 #

3.1. database.py #

database.py

# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional

# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text

# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
# 导入数据库连接数据类DatabaseConnection
from models import DatabaseConnection, DatabaseSchema


# 定义数据库连接管理器类
class DatabaseManager:
    """数据库连接管理器"""

    # 构造方法,初始化数据库连接对象为None
    def __init__(self):
        # 数据库连接对象,初始为None
        self.db_connection: Optional[DatabaseConnection] = None
        self.schema = DatabaseSchema() 

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        try:
            # 如果已经存在数据库连接,则先关闭
            if self.db_connection:
                self.db_connection.close()

            # 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
            connection_string = (
                f"mysql+pymysql://{config['user']}:{config['password']}@"
                f"{config['host']}:{config['port']}/{config['database']}"
            )

            # 创建SQLAlchemy引擎对象
            engine = create_engine(
                connection_string,
                echo=False,  # 不输出SQL语句日志
                pool_pre_ping=True,  # 启用连接池健康检查
                pool_recycle=3600,  # 连接回收时间为3600秒
            )

            # 测试数据库连接,执行一条简单的SQL语句
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))

            # 保存数据库连接信息到db_connection属性
            self.db_connection = DatabaseConnection(
                host=config["host"],
                port=config["port"],
                user=config["user"],
                password=config["password"],
                database=config["database"],
                engine=engine,
            )
            # 创建数据库表
            await self.create_tables()
            # 返回连接成功的信息,包括主机、端口和数据库名
            return {
                "success": True,
                "message": f"成功连接到数据库 {config['database']}",
                "connection_info": {
                    "host": config["host"],
                    "port": config["port"],
                    "database": config["database"],
                },
            }

        # 捕获异常,记录错误日志并返回失败信息
        except Exception as e:
            # 记录数据库连接失败的错误日志
            logger.error(f"数据库连接失败: {str(e)}")
            # 返回失败信息
            return {
                "success": False,
                "message": f"数据库连接失败: {str(e)}",
            }
    # 异步方法:创建数据库表
    async def create_tables(self):
        """创建数据库表"""
        if self.db_connection and self.db_connection.engine:
            self.schema.get_metadata().create_all(self.db_connection.engine)
    # 关闭数据库连接的方法
    def close(self) -> None:
        """关闭数据库连接"""
        # 如果存在数据库连接,则关闭并置为None
        if self.db_connection:
            self.db_connection.close()
            self.db_connection = None

3.2. mcp_tools.py #

mcp_tools.py

# 导入json模块,用于处理JSON数据
import json

# 从config模块导入logger日志记录器
from config import logger

# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer

# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()


# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
    # 返回MySQLMCPServer实例
    """获取MySQL服务器实例"""
    return mysql_server


# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    try:
        # 构造数据库连接配置字典
        config = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
        }
        # 调用MySQLMCPServer的connect_database方法进行异步连接
        result = await mysql_server.connect_database(config)
        # 将连接结果转换为格式化的JSON字符串并返回
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        # 如果连接过程中发生异常,构造错误信息字典
        error_result = {"success": False, "message": f"连接失败: {str(e)}"}
        # 记录错误日志
        logger.error(f"连接失败: {str(e)}")
        # 返回错误信息的JSON字符串
        return json.dumps(error_result, ensure_ascii=False, indent=2)

3.3. models.py #

models.py

# 导入类型注解Any和Optional,Any表示任意类型,Optional表示可选类型
from typing import Any, Optional

# 导入dataclass装饰器,用于简化数据类的定义
from dataclasses import dataclass

# 导入SQLAlchemy相关内容,用于定义数据库表结构
from sqlalchemy import (
    MetaData,  # 用于管理数据库元数据
    Table,  # 用于定义数据库表
    Column,  # 用于定义表的字段
    Integer,  # 整数类型
    String,  # 字符串类型
    Date,  # 日期类型
    Enum,  # 枚举类型
    DECIMAL,  # 定点数类型
    TIMESTAMP,  # 时间戳类型
)

# 导入datetime模块中的datetime类,用于处理日期和时间
from datetime import datetime

# 从config模块导入logger日志记录器
from config import logger


# 使用dataclass装饰器定义数据库连接信息的数据类
@dataclass
class DatabaseConnection:
    """数据库连接信息"""

    # 主机名
    host: str
    # 端口号
    port: int
    # 用户名
    user: str
    # 密码
    password: str
    # 数据库名
    database: str
    # SQLAlchemy引擎对象,可选
    engine: Optional[Any] = None
    # 会话对象(暂未使用),可选
    session: Optional[Any] = None

    # 定义关闭数据库连接的方法
    def close(self) -> None:
        """关闭数据库连接"""
        # 如果engine存在,则释放数据库引擎资源
        if self.engine:
            try:
                # 调用dispose方法释放资源
                self.engine.dispose()
                # 记录数据库连接已关闭的信息
                logger.info("数据库连接已关闭")
            except Exception as e:
                # 记录关闭数据库连接失败的错误信息
                logger.error(f"关闭数据库连接失败: {str(e)}")


# 定义数据库表结构管理类
class DatabaseSchema:
    """数据库表结构定义"""

    # 构造方法,初始化元数据并设置表结构
    def __init__(self):
        # 创建SQLAlchemy元数据对象
        self.metadata = MetaData()
        # 调用私有方法设置数据库表结构
        self._setup_tables()

    # 私有方法,定义所有数据库表结构
    def _setup_tables(self) -> None:
        """设置数据库表结构"""
        # 定义学生表
        Table(
            "students",  # 表名
            self.metadata,  # 关联的元数据对象
            Column(
                "student_id", Integer, primary_key=True, autoincrement=True
            ),  # 学生ID,主键,自增
            Column(
                "student_no", String(20), nullable=False, unique=True
            ),  # 学号,唯一且不能为空
            Column("student_name", String(20), nullable=False),  # 学生姓名,不能为空
            Column("gender", Enum("男", "女"), default="男"),  # 性别,枚举类型,默认男
            Column("class_name", String(20), nullable=False),  # 班级名称,不能为空
            Column("birth_date", Date),  # 出生日期
            Column("parent_phone", String(15)),  # 家长电话
            Column(
                "create_time", TIMESTAMP, default=datetime.now
            ),  # 创建时间,默认当前时间
        )

        # 定义科目表
        Table(
            "subjects",  # 表名
            self.metadata,  # 关联的元数据对象
            Column(
                "subject_id", Integer, primary_key=True, autoincrement=True
            ),  # 科目ID,主键,自增
            Column(
                "subject_name", String(20), nullable=False, unique=True
            ),  # 科目名称,唯一且不能为空
            Column("teacher_name", String(20)),  # 任课老师姓名
            Column(
                "create_time", TIMESTAMP, default=datetime.now
            ),  # 创建时间,默认当前时间
        )

        # 定义成绩表
        Table(
            "scores",  # 表名
            self.metadata,  # 关联的元数据对象
            Column(
                "score_id", Integer, primary_key=True, autoincrement=True
            ),  # 成绩ID,主键,自增
            Column("student_id", Integer, nullable=False),  # 学生ID,不能为空
            Column("subject_id", Integer, nullable=False),  # 科目ID,不能为空
            Column("exam_date", Date, nullable=False),  # 考试日期,不能为空
            Column(
                "exam_type",
                Enum("期中", "期末", "单元测试", "月考"),
                default="期末",  # 考试类型,枚举,默认期末
            ),
            Column("score", DECIMAL(5, 2), nullable=False),  # 分数,定点数,不能为空
            Column(
                "grade_level", Enum("优秀", "良好", "及格", "不及格")
            ),  # 等级,枚举类型
            Column(
                "create_time", TIMESTAMP, default=datetime.now
            ),  # 创建时间,默认当前时间
        )

    # 获取元数据对象的方法
    def get_metadata(self):
        """获取元数据对象"""
        return self.metadata

3.4. mysql_server.py #

mysql_server.py

# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict
# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager

# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
    """MySQL MCP 服务器"""

    # 构造方法,初始化数据库连接管理器
    def __init__(self):
        # 创建一个DatabaseManager实例,管理数据库连接
        self.db_manager = DatabaseManager()

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        # 调用数据库管理器的connect_database方法进行异步连接,并返回结果
        return await self.db_manager.connect_database(config)

    # 关闭服务器和数据库连接的方法
    def close(self) -> None:
        """关闭服务器和数据库连接"""
        # 调用数据库管理器的close方法,关闭数据库连接
        self.db_manager.close()

3.5. config.py #

config.py

# 导入logging模块,用于日志记录
import logging


# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
    # 设置日志的基本配置,包括日志级别和输出格式
    logging.basicConfig(
        # 设置日志级别为INFO
+       level=logging.INFO,
        # 设置日志输出格式
+       format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    # 返回一个以当前模块名命名的logger实例
    return logging.getLogger(__name__)


# 获取logger实例,供其他模块使用
logger = setup_logging()

3.6. main.py #

main.py

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database

# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")


# 使用装饰器注册MCP工具
+@mcp.tool()
# 定义一个异步函数,用于连接数据库
+async def connect_database_tool(
    # 数据库主机地址,默认为localhost
+   host: str = "localhost",
    # 数据库端口号,默认为3306
+   port: int = 3306,
    # 数据库用户名,默认为root
+   user: str = "root",
    # 数据库密码,默认为123456
+   password: str = "123456",
    # 要连接的数据库名称,默认为school
+   database: str = "school",
+) -> str:
    # 函数文档字符串,说明该函数用于连接MySQL数据库
+   """连接MySQL数据库"""
    # 调用connect_database函数,异步连接数据库,并返回结果
+   return await connect_database(host, port, user, password, database)


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    try:
        # 记录服务启动的日志信息
        logger.info("MySQL MCP服务启动中...")
        # 启动FastMCP服务器
        mcp.run()
    except KeyboardInterrupt:
        # 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
        logger.info("服务器已停止")
    except Exception as e:
        # 捕获其他异常,记录错误日志
        logger.error(f"服务器运行时出错: {str(e)}")
        # 导入traceback模块用于打印详细的异常信息
        import traceback

        # 打印异常的详细堆栈信息
        traceback.print_exc()

4. 查看数据库信息 #

4.1. database.py #

database.py

# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional

# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text

# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
from models import DatabaseConnection, DatabaseSchema


# 定义数据库连接管理器类
class DatabaseManager:
    """数据库连接管理器"""

    # 构造方法,初始化数据库连接对象为None
    def __init__(self):
        # 数据库连接对象,初始为None
        self.db_connection: Optional[DatabaseConnection] = None
        self.schema = DatabaseSchema() 
    # 异步方法:创建数据库表
    async def create_tables(self):
        """创建数据库表"""
        if self.db_connection and self.db_connection.engine:
            self.schema.get_metadata().create_all(self.db_connection.engine)
    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        try:
            # 如果已经存在数据库连接,则先关闭
            if self.db_connection:
                self.db_connection.close()

            # 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
            connection_string = (
                f"mysql+pymysql://{config['user']}:{config['password']}@"
                f"{config['host']}:{config['port']}/{config['database']}"
            )

            # 创建SQLAlchemy引擎对象
            engine = create_engine(
                connection_string,
                echo=False,  # 不输出SQL语句日志
                pool_pre_ping=True,  # 启用连接池健康检查
                pool_recycle=3600,  # 连接回收时间为3600秒
            )

            # 测试数据库连接,执行一条简单的SQL语句
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))

            # 保存数据库连接信息到db_connection属性
            self.db_connection = DatabaseConnection(
                host=config["host"],
                port=config["port"],
                user=config["user"],
                password=config["password"],
                database=config["database"],
                engine=engine,
            )
            # 创建数据库表
            await self.create_tables()
            # 返回连接成功的信息,包括主机、端口和数据库名
            return {
                "success": True,
                "message": f"成功连接到数据库 {config['database']}",
                "connection_info": {
                    "host": config["host"],
                    "port": config["port"],
                    "database": config["database"],
                },
            }

        # 捕获异常,记录错误日志并返回失败信息
        except Exception as e:
            # 记录数据库连接失败的错误日志
            logger.error(f"数据库连接失败: {str(e)}")
            # 返回失败信息
            return {
                "success": False,
                "message": f"数据库连接失败: {str(e)}",
            }

    # 定义获取当前数据库连接信息的方法,返回一个字典
+   def get_connection_info(self) -> Dict[str, Any]:
        # 方法文档字符串,说明功能
+       """获取当前数据库连接信息"""
        # 判断是否存在数据库连接
+       if self.db_connection:
            # 如果已连接,返回连接状态及相关信息
+           return {
+               "connected": True,
+               "host": self.db_connection.host,  # 数据库主机地址
+               "port": self.db_connection.port,  # 数据库端口号
+               "database": self.db_connection.database,  # 数据库名称
+               "user": self.db_connection.user,  # 数据库用户名
+           }
+       else:
            # 如果未连接,返回未连接状态及提示信息
+           return {"connected": False, "message": "数据库未连接"}

    # 关闭数据库连接的方法
    def close(self) -> None:
        """关闭数据库连接"""
        # 如果存在数据库连接,则关闭并置为None
        if self.db_connection:
            self.db_connection.close()
            self.db_connection = None

4.2. main.py #

main.py

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database, get_database_info

# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")


# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 函数文档字符串,说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    # 调用connect_database函数,异步连接数据库,并返回结果
    return await connect_database(host, port, user, password, database)


+@mcp.tool()
+async def get_database_info_tool() -> str:
+   """获取当前数据库连接信息"""
+   return await get_database_info()


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    try:
        # 记录服务启动的日志信息
        logger.info("MySQL MCP服务启动中...")
        # 启动FastMCP服务器
        mcp.run()
    except KeyboardInterrupt:
        # 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
        logger.info("服务器已停止")
    except Exception as e:
        # 捕获其他异常,记录错误日志
        logger.error(f"服务器运行时出错: {str(e)}")
        # 导入traceback模块用于打印详细的异常信息
        import traceback

        # 打印异常的详细堆栈信息
        traceback.print_exc()

4.3. mcp_tools.py #

mcp_tools.py

# 导入json模块,用于处理JSON数据
import json

# 从config模块导入logger日志记录器
from config import logger

# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer

# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()


# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
    # 返回MySQLMCPServer实例
    """获取MySQL服务器实例"""
    return mysql_server


# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    try:
        # 构造数据库连接配置字典
        config = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
        }
        # 调用MySQLMCPServer的connect_database方法进行异步连接
        result = await mysql_server.connect_database(config)
        # 将连接结果转换为格式化的JSON字符串并返回
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        # 如果连接过程中发生异常,构造错误信息字典
        error_result = {"success": False, "message": f"连接失败: {str(e)}"}
        # 记录错误日志
        logger.error(f"连接失败: {str(e)}")
        # 返回错误信息的JSON字符串
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义MCP工具:获取当前数据库连接信息
+async def get_database_info() -> str:
+   """获取当前数据库连接信息"""
+   try:
        # 获取数据库连接信息
+       info = mysql_server.db_manager.get_connection_info()
        # 返回信息的JSON字符串
+       return json.dumps(info, ensure_ascii=False, indent=2)
+   except Exception as e:
        # 获取信息失败时返回错误信息
+       error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
+       return json.dumps(error_result, ensure_ascii=False, indent=2)

4.4. mysql_server.py #

mysql_server.py

# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict

# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager


# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
    """MySQL MCP 服务器"""

    # 构造方法,初始化数据库连接管理器
    def __init__(self):
        # 创建一个DatabaseManager实例,管理数据库连接
        self.db_manager = DatabaseManager()

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        # 调用数据库管理器的connect_database方法进行异步连接,并返回结果
        return await self.db_manager.connect_database(config)

    # 关闭服务器和数据库连接的方法
    def close(self) -> None:
        """关闭服务器和数据库连接"""
        # 调用数据库管理器的close方法,关闭数据库连接
        self.db_manager.close()

5. 执行SQL语句 #

5.1. llm_client.py #

llm_client.py

# 导入类型注解相关内容
from typing import Optional

# 导入OpenAI库,用于调用LLM
from openai import OpenAI

# 导入配置,包括API密钥、基础URL和日志记录器
from config import OPENAI_API_KEY, OPENAI_BASE_URL, logger

# 创建OpenAI客户端实例,传入API密钥和基础URL
client = OpenAI(
    api_key=OPENAI_API_KEY,
    base_url=OPENAI_BASE_URL,
)


# 定义LLMClient类,用于封装LLM相关操作
class LLMClient:
    """LLM客户端类"""

    # 静态方法:调用LLM API,返回响应内容
    @staticmethod
    def call_llm(query: str) -> Optional[str]:
        """调用LLM API,返回响应内容"""
        try:
            # 调用LLM的chat.completions.create方法,传入模型、消息和超时参数
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": query}],
                timeout=30,  # 添加超时设置
            )
            # 返回LLM的回复内容
            return response.choices[0].message.content
        except Exception as e:
            # 记录错误日志
            logger.error(f"LLM API调用失败: {str(e)}")
            # 返回None表示调用失败
            return None

5.2. sql_builder.py #

sql_builder.py

# 导入配置和LLM客户端
from config import logger
from llm_client import LLMClient


# 定义SQLBuilder类,用于构建SQL语句和分析SQL错误
class SQLBuilder:
    # SQL语句构建器说明文档
    """SQL语句构建器"""

    # 静态方法:分析SQL错误并给出建议
    @staticmethod
    def analyze_sql_error(error_message: str, sql: str) -> str:
        # 方法说明文档
        """使用LLM分析SQL错误并给出建议"""
        try:
            # 构造LLM提示词,要求分析错误并给建议
            prompt = f"""
            请分析以下MySQL错误信息,并给出解决建议:

            错误信息: {error_message}
            SQL语句: {sql}

            请给出:
            1. 错误原因分析
            2. 具体修复建议
            3. 正确SQL示例(如有必要)

            请用中文简明回答。
            """

            # 调用LLM获取建议
            suggestion = LLMClient.call_llm(prompt)
            # 如果LLM返回建议,则去除首尾空白后返回
            if suggestion:
                return suggestion.strip()
            # 如果没有建议,返回默认提示
            return "无法分析错误,请检查SQL语法和数据库连接。"
        # 捕获异常,分析失败时返回默认提示
        except Exception as e:
            # 记录警告日志
            logger.warning(f"错误分析失败: {str(e)}")
            # 返回默认提示
            return "无法分析错误,请检查SQL语法和数据库连接。"

5.3. config.py #

config.py

# 导入logging模块,用于日志记录
import logging

# 导入os模块,用于操作系统相关功能
+import os


# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
    # 设置日志的基本配置,包括日志级别和输出格式
    logging.basicConfig(
        # 设置日志级别为INFO
        level=logging.INFO,
        # 设置日志输出格式
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    # 返回一个以当前模块名命名的logger实例
    return logging.getLogger(__name__)


# 获取logger实例,供其他模块使用
logger = setup_logging()

# 构建默认数据库连接配置,从环境变量获取各项参数,如果没有设置则使用默认值
+DEFAULT_DB_CONFIG = {
    # 获取数据库主机地址,默认localhost
+   "host": os.getenv("DB_HOST", "localhost"),
    # 获取数据库端口号,默认3306,并转换为整数类型
+   "port": int(os.getenv("DB_PORT", "3306")),
    # 获取数据库用户名,默认root
+   "user": os.getenv("DB_USER", "root"),
    # 获取数据库密码,默认123456
+   "password": os.getenv("DB_PASSWORD", "123456"),
    # 获取数据库名称,默认school
+   "database": os.getenv("DB_NAME", "school"),
+}
# 从环境变量中获取OpenAI API密钥,如果没有设置则使用默认值
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-8c78c2e5832b4f3e84ee56b94460dfe9")

# 从环境变量中获取OpenAI Base URL,如果没有设置则使用默认值
+OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com")

5.4. database.py #

database.py

# 导入类型注解相关内容(Any表示任意类型,Dict表示字典,Optional表示可选类型)
from typing import Any, Dict, Optional

# 导入SQLAlchemy的create_engine和text,用于数据库操作
from sqlalchemy import create_engine, text

# 导入日期和时间相关类
+from datetime import datetime, date

# 导入Decimal类,用于高精度浮点数
+from decimal import Decimal

# 导入日志记录器logger和数据库连接数据类DatabaseConnection
from config import logger
from models import DatabaseConnection


# 定义数据库连接管理器类
class DatabaseManager:
    """数据库连接管理器"""

    # 构造方法,初始化数据库连接对象为None
    def __init__(self):
        # 数据库连接对象,初始为None
        self.db_connection: Optional[DatabaseConnection] = None

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        try:
            # 如果已经存在数据库连接,则先关闭
            if self.db_connection:
                self.db_connection.close()

            # 构建数据库连接字符串,格式为mysql+pymysql://user:password@host:port/database
            connection_string = (
                f"mysql+pymysql://{config['user']}:{config['password']}@"
                f"{config['host']}:{config['port']}/{config['database']}"
            )

            # 创建SQLAlchemy引擎对象
            engine = create_engine(
                connection_string,
                echo=False,  # 不输出SQL语句日志
                pool_pre_ping=True,  # 启用连接池健康检查
                pool_recycle=3600,  # 连接回收时间为3600秒
            )

            # 测试数据库连接,执行一条简单的SQL语句
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))

            # 保存数据库连接信息到db_connection属性
            self.db_connection = DatabaseConnection(
                host=config["host"],
                port=config["port"],
                user=config["user"],
                password=config["password"],
                database=config["database"],
                engine=engine,
            )

            # 返回连接成功的信息,包括主机、端口和数据库名
            return {
                "success": True,
                "message": f"成功连接到数据库 {config['database']}",
                "connection_info": {
                    "host": config["host"],
                    "port": config["port"],
                    "database": config["database"],
                },
            }

        # 捕获异常,记录错误日志并返回失败信息
        except Exception as e:
            # 记录数据库连接失败的错误日志
            logger.error(f"数据库连接失败: {str(e)}")
            # 返回失败信息
            return {
                "success": False,
                "message": f"数据库连接失败: {str(e)}",
            }

    # 定义获取当前数据库连接信息的方法,返回一个字典
    def get_connection_info(self) -> Dict[str, Any]:
        # 方法文档字符串,说明功能
        """获取当前数据库连接信息"""
        # 判断是否存在数据库连接
        if self.db_connection:
            # 如果已连接,返回连接状态及相关信息
            return {
                "connected": True,
                "host": self.db_connection.host,  # 数据库主机地址
                "port": self.db_connection.port,  # 数据库端口号
                "database": self.db_connection.database,  # 数据库名称
                "user": self.db_connection.user,  # 数据库用户名
            }
        else:
            # 如果未连接,返回未连接状态及提示信息
            return {"connected": False, "message": "数据库未连接"}

    # 异步方法:执行SQL语句
+   async def execute_query(self, sql: str) -> Dict[str, Any]:
        # 执行SQL语句
+       """执行SQL语句"""
        # 如果未连接数据库,返回错误信息
+       if not self.db_connection or not self.db_connection.engine:
+           return {
+               "success": False,
+               "message": "数据库未连接,请先连接数据库。",
+           }

+       try:
            # 获取数据库连接
+           with self.db_connection.engine.connect() as conn:
                # 执行SQL语句
+               result = conn.execute(text(sql))

                # 如果是SELECT查询
+               if sql.strip().upper().startswith("SELECT"):
                    # 获取所有结果行
+                   rows = result.fetchall()
                    # 获取列名
+                   columns = result.keys()

                    # 转换为可序列化格式
+                   serializable_rows = []
+                   for row in rows:
                        # 每一行转为字典
+                       row_dict = {}
+                       for i, col in enumerate(columns):
+                           value = row[i]
                            # 日期类型转为字符串
+                           if isinstance(value, (date, datetime)):
+                               value = value.isoformat()
                            # Decimal类型转为float
+                           elif isinstance(value, Decimal):
+                               value = float(value)
                            # 赋值到字典
+                           row_dict[col] = value
                        # 添加到结果列表
+                       serializable_rows.append(row_dict)

                    # 返回查询结果
+                   return {
+                       "success": True,
+                       "message": f"查询成功,共返回{len(serializable_rows)}条记录",
+                       "data": serializable_rows,
+                       "columns": list(columns),
+                       "sql": sql,
+                   }
+               else:
                    # 非查询操作,提交事务
+                   conn.commit()
                    # 返回操作成功信息
+                   return {
+                       "success": True,
+                       "message": "操作执行成功",
+                       "affected_rows": result.rowcount,
+                       "sql": sql,
+                   }

        # 捕获异常
+       except Exception as e:
            # 记录SQL执行失败日志
+           logger.error(f"SQL执行失败: {str(e)}")
            # 返回失败信息
+           return {
+               "success": False,
+               "message": f"SQL执行失败: {str(e)}",
+               "error": str(e),
+           }

    # 关闭数据库连接的方法
    def close(self) -> None:
        """关闭数据库连接"""
        # 如果存在数据库连接,则关闭并置为None
        if self.db_connection:
            self.db_connection.close()
            self.db_connection = None

5.5. main.py #

main.py

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import connect_database, get_database_info, execute_sql

# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")


# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 函数文档字符串,说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    # 调用connect_database函数,异步连接数据库,并返回结果
    return await connect_database(host, port, user, password, database)


@mcp.tool()
async def get_database_info_tool() -> str:
    """获取当前数据库连接信息"""
    return await get_database_info()


# 注册MCP工具:执行SQL语句
+@mcp.tool()
+async def execute_sql_tool(sql: str) -> str:
+   """直接执行SQL语句"""
    # 调用execute_sql函数执行SQL语句
+   return await execute_sql(sql)


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    try:
        # 记录服务启动的日志信息
        logger.info("MySQL MCP服务启动中...")
        # 启动FastMCP服务器
        mcp.run()
    except KeyboardInterrupt:
        # 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
        logger.info("服务器已停止")
    except Exception as e:
        # 捕获其他异常,记录错误日志
        logger.error(f"服务器运行时出错: {str(e)}")
        # 导入traceback模块用于打印详细的异常信息
        import traceback

        # 打印异常的详细堆栈信息
        traceback.print_exc()

5.6. mcp_tools.py #

mcp_tools.py

# 导入json模块,用于处理JSON数据
import json

# 从config模块导入logger日志记录器
from config import logger

# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer

# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()


# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
    # 返回MySQLMCPServer实例
    """获取MySQL服务器实例"""
    return mysql_server


# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    try:
        # 构造数据库连接配置字典
        config = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
        }
        # 调用MySQLMCPServer的connect_database方法进行异步连接
        result = await mysql_server.connect_database(config)
        # 将连接结果转换为格式化的JSON字符串并返回
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        # 如果连接过程中发生异常,构造错误信息字典
        error_result = {"success": False, "message": f"连接失败: {str(e)}"}
        # 记录错误日志
        logger.error(f"连接失败: {str(e)}")
        # 返回错误信息的JSON字符串
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义MCP工具:获取当前数据库连接信息
async def get_database_info() -> str:
    """获取当前数据库连接信息"""
    try:
        # 获取数据库连接信息
        info = mysql_server.db_manager.get_connection_info()
        # 返回信息的JSON字符串
        return json.dumps(info, ensure_ascii=False, indent=2)
    except Exception as e:
        # 获取信息失败时返回错误信息
        error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义异步函数:直接执行SQL语句
+async def execute_sql(sql: str) -> str:
    # 函数文档字符串,说明功能
+   """直接执行SQL语句"""
+   try:
        # 调用MySQLMCPServer的execute_query方法执行SQL
+       result = await mysql_server.execute_query(sql)
        # 将结果转换为JSON字符串并返回
+       return json.dumps(result, ensure_ascii=False, indent=2)
+   except Exception as e:
        # 捕获异常,构造错误信息字典
+       error_result = {"success": False, "message": f"SQL执行失败: {str(e)}"}
        # 返回错误信息的JSON字符串
+       return json.dumps(error_result, ensure_ascii=False, indent=2)

5.7. mysql_server.py #

mysql_server.py

# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict

# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager
+from config import DEFAULT_DB_CONFIG
+from sql_builder import SQLBuilder


# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
    """MySQL MCP 服务器"""

    # 构造方法,初始化数据库连接管理器
    def __init__(self):
        # 创建一个DatabaseManager实例,管理数据库连接
        self.db_manager = DatabaseManager()

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        # 调用数据库管理器的connect_database方法进行异步连接,并返回结果
        return await self.db_manager.connect_database(config)

    # 异步方法:执行SQL语句
+   async def execute_query(self, sql: str) -> Dict[str, Any]:
+       """执行SQL语句"""
        # 如果当前没有数据库连接,则先连接默认数据库
+       if not self.db_manager.db_connection:
+           await self.db_manager.connect_database(DEFAULT_DB_CONFIG)

        # 调用数据库管理器执行SQL查询
+       result = await self.db_manager.execute_query(sql)

        # 如果查询失败且包含错误信息,则用LLM分析SQL错误
+       if not result.get("success") and "error" in result:
+           error_analysis = SQLBuilder.analyze_sql_error(result["error"], sql)
            # 将错误分析结果添加到返回结果中
+           result["error_analysis"] = error_analysis

        # 返回查询结果
+       return result

    # 关闭服务器和数据库连接的方法
    def close(self) -> None:
        """关闭服务器和数据库连接"""
        # 调用数据库管理器的close方法,关闭数据库连接
        self.db_manager.close()

6. 自然语言查询 #

6.1. .env #

.env

OPENAI_API_KEY=sk-e9fc8cde99dc47f8970d514dd7d941f2
OPENAI_BASE_URL=https://api.deepseek.com
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=123456
DB_NAME=school

6.2. query_parser.py #

query_parser.py

# 导入Any, Dict, Optional用于类型注解
from typing import Any, Dict, Optional

# 导入日志记录器logger和LLM客户端
from config import logger
from llm_client import LLMClient


# 定义自然语言查询解析器类
class QueryParser:
    # 类文档字符串,说明该类用于自然语言查询解析
    """自然语言查询解析器"""

    # 静态方法,解析自然语言查询
    @staticmethod
    def parse_natural_language(query: str) -> Optional[Dict[str, Any]]:
        # 方法文档字符串,说明功能
        """解析自然语言查询"""
        try:
            # 构造LLM提示词,要求分析中文查询并返回结构化JSON
            prompt = f"""
            请分析以下中文查询,识别查询意图、涉及的表、条件和操作类型。
            查询内容: {query}

            请以JSON格式返回结果,包含以下字段:
            - operation: 操作类型 (SELECT/INSERT/UPDATE/DELETE)
            - tables: 涉及的表名列表 (students/subjects/scores)
            - conditions: 查询条件列表,每个条件包含 (字段, 操作符, 值)
            - limit: 返回记录数上限(可选)
            - order_by: 排序字段(可选)

            示例输出:
            {{
                "operation": "SELECT",
                "tables": ["students", "scores"],
                "conditions": [["gender", "=", "男"], ["score", ">=", "80"]],
                "limit": 10,
                "order_by": "score DESC"
            }}
            """

            # 调用LLM客户端,传入prompt,获取解析结果
            parsed_result = LLMClient.call_llm_with_json(prompt)
            # 如果LLM返回结果为None,说明解析失败,返回默认解析结果
            if parsed_result is None:
                logger.warning("LLM返回格式无效,使用默认解析")
                return {
                    "operation": "SELECT",
                    "tables": ["students"],
                    "conditions": [],
                    "limit": None,
                    "order_by": None,
                    "original_query": query,
                }

            # 正常返回解析结果,若某字段缺失则使用默认值
            return {
                "operation": parsed_result.get("operation", "SELECT"),
                "tables": parsed_result.get(
                    "tables", ["students", "subjects", "scores"]
                ),
                "conditions": parsed_result.get("conditions", []),
                "limit": parsed_result.get("limit"),
                "order_by": parsed_result.get("order_by"),
                "original_query": query,
            }

        # 捕获异常,解析失败时返回默认解析结果
        except Exception as e:
            logger.warning(f"LLM解析失败,使用默认解析: {str(e)}")
            return {
                "operation": "SELECT",
                "tables": ["students"],
                "conditions": [],
                "limit": None,
                "order_by": None,
                "original_query": query,
            }

6.3. config.py #

config.py

# 导入logging模块,用于日志记录
import logging

# 导入os模块,用于操作系统相关功能
import os
+from dotenv import load_dotenv

# 加载 .env 文件中的环境变量
+load_dotenv()


# 定义一个函数用于配置日志格式和日志级别
def setup_logging():
    # 设置日志的基本配置,包括日志级别和输出格式
    logging.basicConfig(
        # 设置日志级别为INFO
        level=logging.INFO,
        # 设置日志输出格式
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    # 返回一个以当前模块名命名的logger实例
    return logging.getLogger(__name__)


# 获取logger实例,供其他模块使用
logger = setup_logging()

# 构建默认数据库连接配置,从环境变量获取各项参数,如果没有设置则使用默认值
DEFAULT_DB_CONFIG = {
    # 获取数据库主机地址,默认localhost
    "host": os.getenv("DB_HOST", "localhost"),
    # 获取数据库端口号,默认3306,并转换为整数类型
    "port": int(os.getenv("DB_PORT", "3306")),
    # 获取数据库用户名,默认root
    "user": os.getenv("DB_USER", "root"),
    # 获取数据库密码,默认123456
    "password": os.getenv("DB_PASSWORD", "123456"),
    # 获取数据库名称,默认school
    "database": os.getenv("DB_NAME", "school"),
}
# 从环境变量中获取OpenAI API密钥,如果没有设置则使用默认值
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-e9fc8cde99dc47f8970d514dd7d941f2")

# 从环境变量中获取OpenAI Base URL,如果没有设置则使用默认值
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com")

6.4. llm_client.py #

llm_client.py

# 导入类型注解相关内容
+from typing import Any, Dict, Optional

# 导入json模块,用于处理JSON数据
+import json

# 导入OpenAI库,用于调用LLM
from openai import OpenAI

# 导入配置,包括API密钥、基础URL和日志记录器
from config import OPENAI_API_KEY, OPENAI_BASE_URL, logger

# 创建OpenAI客户端实例,传入API密钥和基础URL
client = OpenAI(
+   api_key="sk-e9fc8cde99dc47f8970d514dd7d941f2",
    base_url=OPENAI_BASE_URL,
)


# 定义LLMClient类,用于封装LLM相关操作
class LLMClient:
    """LLM客户端类"""

    # 静态方法:调用LLM API,返回响应内容
    @staticmethod
    def call_llm(query: str) -> Optional[str]:
        """调用LLM API,返回响应内容"""
        try:
            # 调用LLM的chat.completions.create方法,传入模型、消息和超时参数
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": query}],
                timeout=30,  # 添加超时设置
            )
            # 返回LLM的回复内容
            return response.choices[0].message.content
        except Exception as e:
            # 记录错误日志
            logger.error(f"LLM API调用失败: {str(e)}")
            # 返回None表示调用失败
            return None

    # 静态方法:调用LLM并解析JSON响应
+   @staticmethod
+   def call_llm_with_json(query: str) -> Optional[Dict[str, Any]]:
+       """调用LLM并解析JSON响应"""
        # 调用LLM获取内容
+       content = LLMClient.call_llm(query)
        # 如果内容为空,返回None
+       if not content:
+           return None

+       try:
            # 如果内容中包含```json代码块,提取其中的JSON部分
+           if "```json" in content:
+               content = content.split("```json")[1].split("```")[0].strip()
+               return json.loads(content)
            # 如果内容中包含```代码块,提取其中的JSON部分
+           elif "```" in content:
+               content = content.split("```")[1].split("```")[0].strip()
+               return json.loads(content)
            # 否则返回None
+           return None
+       except json.JSONDecodeError as e:
            # 记录JSON解析失败日志
+           logger.error(f"JSON解析失败: {str(e)}")
            # 返回None表示解析失败
+           return None

    # 静态方法:调用LLM并提取SQL语句
+   @staticmethod
+   def call_llm_with_sql(query: str) -> Optional[str]:
+       """调用LLM并提取SQL语句"""
        # 调用LLM获取内容
+       content = LLMClient.call_llm(query)
        # 如果内容为空,返回None
+       if not content:
+           return None

+       try:
            # 如果内容中包含```sql代码块,提取其中的SQL部分
+           if "```sql" in content:
+               content = content.split("```sql")[1].split("```")[0].strip()
+               return content
            # 如果内容中包含```代码块,提取其中的SQL部分
+           elif "```" in content:
+               content = content.split("```")[1].split("```")[0].strip()
+               return content
+           else:
                # 否则直接返回内容
+               return content
+       except Exception as e:
            # 记录SQL提取失败日志
+           logger.error(f"SQL提取失败: {str(e)}")
            # 返回None表示提取失败
+           return None

6.5. main.py #

main.py

# 从mcp.server模块导入FastMCP类
from mcp.server import FastMCP

# 从config模块导入logger日志记录器
from config import logger
+from mcp_tools import (
+   connect_database,
+   get_database_info,
+   execute_sql,
+   natural_language_query,
+)

# 创建一个FastMCP服务器实例,指定数据库为"mysql-database"
mcp = FastMCP("mysql-database")


# 使用装饰器注册MCP工具
@mcp.tool()
# 定义一个异步函数,用于连接数据库
async def connect_database_tool(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 函数文档字符串,说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    # 调用connect_database函数,异步连接数据库,并返回结果
    return await connect_database(host, port, user, password, database)


@mcp.tool()
async def get_database_info_tool() -> str:
    """获取当前数据库连接信息"""
    return await get_database_info()


# 注册MCP工具:执行SQL语句
@mcp.tool()
async def execute_sql_tool(sql: str) -> str:
    """直接执行SQL语句"""
    # 调用execute_sql函数执行SQL语句
    return await execute_sql(sql)


# 注册MCP工具:自然语言查询数据库
+@mcp.tool()
+async def natural_language_query_tool(query: str) -> str:
+   """使用自然语言查询MySQL数据库"""
    # 调用natural_language_query函数执行自然语言查询
+   return await natural_language_query(query)


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    try:
        # 记录服务启动的日志信息
        logger.info("MySQL MCP服务启动中...")
        # 启动FastMCP服务器
        mcp.run()
    except KeyboardInterrupt:
        # 捕获键盘中断(如Ctrl+C),记录服务器已停止的日志
        logger.info("服务器已停止")
    except Exception as e:
        # 捕获其他异常,记录错误日志
        logger.error(f"服务器运行时出错: {str(e)}")
        # 导入traceback模块用于打印详细的异常信息
        import traceback

        # 打印异常的详细堆栈信息
        traceback.print_exc()

6.6. mcp_tools.py #

mcp_tools.py

# 导入json模块,用于处理JSON数据
import json

# 从config模块导入logger日志记录器
from config import logger

# 从mysql_server模块导入MySQLMCPServer类
from mysql_server import MySQLMCPServer

# 创建MySQLMCPServer的实例,用于后续数据库操作
mysql_server = MySQLMCPServer()


# 定义一个函数,用于获取MySQLMCPServer实例
def get_mysql_server() -> MySQLMCPServer:
    # 返回MySQLMCPServer实例
    """获取MySQL服务器实例"""
    return mysql_server


# 定义一个异步函数,用于连接MySQL数据库
async def connect_database(
    # 数据库主机地址,默认为localhost
    host: str = "localhost",
    # 数据库端口号,默认为3306
    port: int = 3306,
    # 数据库用户名,默认为root
    user: str = "root",
    # 数据库密码,默认为123456
    password: str = "123456",
    # 要连接的数据库名称,默认为school
    database: str = "school",
) -> str:
    # 说明该函数用于连接MySQL数据库
    """连接MySQL数据库"""
    try:
        # 构造数据库连接配置字典
        config = {
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
        }
        # 调用MySQLMCPServer的connect_database方法进行异步连接
        result = await mysql_server.connect_database(config)
        # 将连接结果转换为格式化的JSON字符串并返回
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        # 如果连接过程中发生异常,构造错误信息字典
        error_result = {"success": False, "message": f"连接失败: {str(e)}"}
        # 记录错误日志
        logger.error(f"连接失败: {str(e)}")
        # 返回错误信息的JSON字符串
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义MCP工具:获取当前数据库连接信息
async def get_database_info() -> str:
    """获取当前数据库连接信息"""
    try:
        # 获取数据库连接信息
        info = mysql_server.db_manager.get_connection_info()
        # 返回信息的JSON字符串
        return json.dumps(info, ensure_ascii=False, indent=2)
    except Exception as e:
        # 获取信息失败时返回错误信息
        error_result = {"success": False, "message": f"获取信息失败: {str(e)}"}
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义异步函数:直接执行SQL语句
async def execute_sql(sql: str) -> str:
    # 函数文档字符串,说明功能
    """直接执行SQL语句"""
    try:
        # 调用MySQLMCPServer的execute_query方法执行SQL
        result = await mysql_server.execute_query(sql)
        # 将结果转换为JSON字符串并返回
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        # 捕获异常,构造错误信息字典
        error_result = {"success": False, "message": f"SQL执行失败: {str(e)}"}
        # 返回错误信息的JSON字符串
        return json.dumps(error_result, ensure_ascii=False, indent=2)


# 定义异步函数:自然语言查询数据库
+async def natural_language_query(query: str) -> str:
    # 函数文档字符串,说明功能
+   """使用自然语言查询MySQL数据库"""
    # 记录查询日志
+   logger.info(f"MySQL数据库自然语言查询: {query}")
+   try:
        # 调用MySQLMCPServer的natural_language_query方法执行查询
+       result = await mysql_server.natural_language_query(query)
        # 记录查询结果日志
+       logger.info(f"自然语言查询结果: {result}")
        # 将结果转换为JSON字符串并返回
+       return json.dumps(result, ensure_ascii=False, indent=2)
+   except Exception as e:
        # 捕获异常,构造错误信息字典
+       error_result = {"success": False, "message": f"查询失败: {str(e)}"}
        # 返回错误信息的JSON字符串
+       return json.dumps(error_result, ensure_ascii=False, indent=2)

6.7. mysql_server.py #

mysql_server.py

# 导入Any和Dict类型注解,用于类型提示
from typing import Any, Dict

# 从database模块导入DatabaseManager数据库管理器
from database import DatabaseManager

# 导入logger日志记录器和默认数据库配置
+from config import logger, DEFAULT_DB_CONFIG

# 导入SQL语句构建器
from sql_builder import SQLBuilder

# 导入自然语言查询解析器
+from query_parser import QueryParser

# 导入LLM客户端
+from llm_client import LLMClient


# 定义MySQLMCPServer类,表示MySQL MCP服务器
class MySQLMCPServer:
    """MySQL MCP 服务器"""

    # 构造方法,初始化数据库连接管理器
    def __init__(self):
        # 创建一个DatabaseManager实例,管理数据库连接
        self.db_manager = DatabaseManager()

    # 异步方法,用于连接数据库
    async def connect_database(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """连接数据库"""
        # 调用数据库管理器的connect_database方法进行异步连接,并返回结果
        return await self.db_manager.connect_database(config)

    # 异步方法:执行SQL语句
    async def execute_query(self, sql: str) -> Dict[str, Any]:
        """执行SQL语句"""
        # 如果当前没有数据库连接,则先连接默认数据库
        if not self.db_manager.db_connection:
            await self.db_manager.connect_database(DEFAULT_DB_CONFIG)

        # 调用数据库管理器执行SQL查询
        result = await self.db_manager.execute_query(sql)

        # 如果查询失败且包含错误信息,则用LLM分析SQL错误
        if not result.get("success") and "error" in result:
            error_analysis = SQLBuilder.analyze_sql_error(result["error"], sql)
            # 将错误分析结果添加到返回结果中
            result["error_analysis"] = error_analysis

        # 返回查询结果
        return result

    # 异步方法:处理自然语言查询
+   async def natural_language_query(self, query: str) -> Dict[str, Any]:
+       """自然语言查询处理"""
+       try:
            # 使用QueryParser解析自然语言查询
+           parsed = QueryParser.parse_natural_language(query)
            # 如果解析失败,返回错误信息
+           if not parsed:
+               return {
+                   "success": False,
+                   "message": "无法解析查询意图",
+                   "original_query": query,
+               }

            # 记录解析结果日志
+           logger.info(f"解析结果: {parsed}")

            # 使用SQLBuilder根据解析结果生成SQL语句
+           sql = SQLBuilder.build_sql_query(parsed)
            # 如果SQL生成失败,返回错误信息
+           if not sql:
+               return {
+                   "success": False,
+                   "message": "无法生成SQL语句",
+                   "original_query": query,
+                   "parsed_intent": parsed,
+               }

            # 记录生成的SQL日志
+           logger.info(f"生成SQL: {sql}")

            # 执行SQL查询
+           result = await self.execute_query(sql)
            # 记录查询结果日志
+           logger.info(f"查询结果: {result}")

            # 如果查询成功且有数据,生成结果解释
+           if result.get("success") and "data" in result:
+               result["explanation"] = self._explain_query_result(
+                   query, result, parsed
+               )

            # 返回完整的查询流程结果
+           return {
+               "success": True,
+               "original_query": query,
+               "parsed_intent": parsed,
+               "generated_sql": sql,
+               "execution_result": result,
+           }

        # 捕获异常,记录错误日志并返回错误信息
+       except Exception as e:
+           logger.error(f"自然语言查询处理失败: {str(e)}")
+           return {
+               "success": False,
+               "message": f"自然语言查询处理失败: {str(e)}",
+               "original_query": query,
+           }

    # 私有方法:使用LLM对查询结果进行解释
+   def _explain_query_result(
+       self, original_query: str, result: Dict[str, Any], parsed: Dict[str, Any]
+   ) -> str:
+       """使用LLM对查询结果进行解释"""
+       try:
            # 初始化数据摘要
+           data_summary = ""
            # 如果有查询数据
+           if result.get("data"):
                # 统计数据条数
+               data_count = len(result["data"])
+               if data_count > 0:
                    # 取前3条数据作为示例
+                   sample_data = result["data"][:3]
                    # 生成数据摘要字符串
+                   data_summary = f"共查询到{data_count}条记录,示例数据: {json.dumps(sample_data, ensure_ascii=False)}"
+               else:
                    # 没有数据时的摘要
+                   data_summary = "未查询到任何记录"

            # 构造LLM提示词,要求简要解释查询结果
+           prompt = f"""
+           请根据以下信息,用简明中文解释查询结果:

+           原始查询: {original_query}
+           查询意图: {parsed.get('operation', 'SELECT')} 操作,涉及表: {', '.join(parsed.get('tables', []))}
+           查询结果: {data_summary}

+           请给出:
+           1. 查询结果的简要总结
+           2. 若结果为空,可能的原因
+           3. 如有需要,给出进一步查询建议

+           答案不超过100字。
+           """

            # 调用LLMClient获取解释内容
+           explanation = LLMClient.call_llm(prompt)
            # 如果LLM返回了解释,则去除首尾空白后返回
+           if explanation:
+               return explanation.strip()
            # 如果没有解释,返回默认提示
+           return "查询已完成,请查看数据。"
        # 捕获异常,记录警告日志并返回默认提示
+       except Exception as e:
+           logger.warning(f"结果解释失败: {str(e)}")
+           return "查询已完成,请查看数据。"

    # 关闭服务器和数据库连接的方法
    def close(self) -> None:
        """关闭服务器和数据库连接"""
        # 调用数据库管理器的close方法,关闭数据库连接
        self.db_manager.close()

6.8. sql_builder.py #

sql_builder.py

# 导入配置和LLM客户端
from config import logger

# 导入json模块,用于处理JSON数据
+import json

# 导入类型注解相关内容
+from typing import Any, Dict, Optional

from llm_client import LLMClient


# 定义SQLBuilder类,用于构建SQL语句和分析SQL错误
class SQLBuilder:
    # SQL语句构建器说明文档
    """SQL语句构建器"""

    # 静态方法:分析SQL错误并给出建议
    @staticmethod
    def analyze_sql_error(error_message: str, sql: str) -> str:
        # 方法说明文档
        """使用LLM分析SQL错误并给出建议"""
        try:
            # 构造LLM提示词,要求分析错误并给建议
            prompt = f"""
            请分析以下MySQL错误信息,并给出解决建议:

            错误信息: {error_message}
            SQL语句: {sql}

            请给出:
            1. 错误原因分析
            2. 具体修复建议
            3. 正确SQL示例(如有必要)

            请用中文简明回答。
            """

            # 调用LLM获取建议
            suggestion = LLMClient.call_llm(prompt)
            # 如果LLM返回建议,则去除首尾空白后返回
            if suggestion:
                return suggestion.strip()
            # 如果没有建议,返回默认提示
            return "无法分析错误,请检查SQL语法和数据库连接。"
        # 捕获异常,分析失败时返回默认提示
        except Exception as e:
            # 记录警告日志
            logger.warning(f"错误分析失败: {str(e)}")
            # 返回默认提示
            return "无法分析错误,请检查SQL语法和数据库连接。"

    # 静态方法:根据解析结果构建SQL查询语句
+   @staticmethod
+   def build_sql_query(parsed: Dict[str, Any]) -> Optional[str]:
        # 方法说明文档
+       """构建SQL查询语句"""
+       try:
            # 构造LLM提示词,要求根据解析结果生成SQL
+           prompt = f"""
+           请根据以下解析结果生成MySQL SQL语句:

+           解析结果: {json.dumps(parsed, ensure_ascii=False)}

+           数据库表结构:
+           - students: student_id(PK), student_no, student_name, gender, class_name, birth_date, parent_phone, create_time
+           - subjects: subject_id(PK), subject_name, teacher_name, create_time  
+           - scores: score_id(PK), student_id, subject_id, exam_date, exam_type, score, grade_level, create_time

+           要求:
+           1. 如果是SELECT查询,选择合适字段,避免SELECT *
+           2. 多表查询时使用合适的JOIN
+           3. 添加合适的WHERE条件
+           4. 有order_by时加ORDER BY
+           5. 有limit时加LIMIT
+           6. 只返回完整SQL语句,不要有解释

+           示例: SELECT s.student_name, s.class_name, sc.score FROM students s JOIN scores sc ON s.student_id = sc.student_id WHERE sc.score >= 80 ORDER BY sc.score DESC LIMIT 10
+           """

            # 调用LLM获取SQL语句
+           sql = LLMClient.call_llm_with_sql(prompt)
            # 如果生成失败,返回默认SQL
+           if sql is None:
+               logger.warning("LLM生成SQL失败,使用默认SQL")
+               return "SELECT * FROM students LIMIT 10"
            # 返回生成的SQL语句
+           return sql

        # 捕获异常,生成SQL失败时返回默认SQL
+       except Exception as e:
            # 生成SQL失败时记录警告日志
+           logger.warning(f"LLM生成SQL失败,使用默认SQL: {str(e)}")
            # 返回默认SQL
+           return "SELECT * FROM students LIMIT 10"

7. @dataclass #

@dataclass 是 Python 3.7+ 引入的一个装饰器,它位于 dataclasses 模块中。它的主要作用是自动生成数据类的样板代码,大大简化了数据类的定义。 @dataclass 被用来装饰 DatabaseConnection 类:

@dataclass
class DatabaseConnection:
    """数据库连接信息"""
    host: str
    port: int
    user: str
    password: str
    database: str
    engine: Optional[Any] = None
    session: Optional[Any] = None

7.1. 自动生成 __init__ 方法 #

# 使用 @dataclass 后,Python 自动生成:
def __init__(self, host: str, port: int, user: str, password: str, 
             database: str, engine: Optional[Any] = None, 
             session: Optional[Any] = None):
    self.host = host
    self.port = port
    self.user = user
    self.password = password
    self.database = database
    self.engine = engine
    self.session = session

7.2. 自动生成 __repr__ 方法 #

# 自动生成字符串表示,便于调试:
def __repr__(self):
    return f"DatabaseConnection(host='{self.host}', port={self.port}, ...)"

7.3. 自动生成 __eq__ 方法 #

# 自动生成相等性比较方法:
def __eq__(self, other):
    if not isinstance(other, DatabaseConnection):
        return False
    return (self.host == other.host and 
            self.port == other.port and 
            self.user == other.user and 
            # ... 其他字段比较)

7.4. 不使用 @dataclass 的传统写法 #

如果不使用 @dataclass,您需要手动编写这些方法:

class DatabaseConnection:
    def __init__(self, host: str, port: int, user: str, password: str, 
                 database: str, engine: Optional[Any] = None, 
                 session: Optional[Any] = None):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.database = database
        self.engine = engine
        self.session = session

    def __repr__(self):
        return f"DatabaseConnection(host='{self.host}', port={self.port}, ...)"

    def __eq__(self, other):
        # 复杂的相等性比较逻辑
        pass

7.5. @dataclass 的优势 #

  1. 代码简洁性:减少了大量样板代码
  2. 类型安全:支持类型注解,便于IDE提供智能提示
  3. 自动生成:自动生成常用的魔术方法
  4. 可定制性:可以通过参数控制生成哪些方法
  5. 维护性:修改字段时,相关方法会自动更新

访问验证

请输入访问令牌

Token不正确,请重新输入