导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.FunctionCalling
  • starlette
  • FastAPI
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • LiteLLM
  • pydantic-settings
  • ai_agent
  • format
  • diff
  • mcp_server
  • 1. 初始化
  • 2.天气预告
    • 2.1. weather-client.py
    • 2.2. weather-server.py
    • 2.3 执行过程
  • 3.地点搜索
    • 3.1. place-client.py
    • 3.2. place-server.py
    • 3.3 执行过程
  • 4. 路径规划
    • 4.1. direction-client.py
    • 4.2. direction-server.py
    • 4.3 执行过程

1. 初始化 #

uv init 
uv add mcp[cli]
uv add langchain langchain-core langchain-deepseek langchain-openai

2.天气预告 #

本节将基于开源的 MCP(Multi-Component Protocol)体系,构建一个支持天气预报的轻量级服务。

你将看到如何:

  • 通过简单的 Python 代码启动和测试一个本地“天气”MCP服务;
  • 使用标准输入输出流作为组件间通信的桥梁,实现模块化、解耦的微服务风格开发;
  • 利用百度地图接口与大模型服务,提供灵活的智能天气查询能力。

我们将演示如何一键启动本地天气服务容器,自动通过 MCP 协议调用天气获取函数,最终完成“给定目的地的未来3天天气预报”这一任务。

2.1. weather-client.py #

weather-client.py

# 调试用:启动 weather stdio MCP 并调用 get_travel_forecast。
"""调试用:启动 weather stdio MCP 并调用 get_travel_forecast。"""

# 导入异步操作所需的 asyncio 库
import asyncio
# 导入 sys,便于获取 Python 解释器路径
import sys
# 导入 Path,用于方便地处理文件路径
from pathlib import Path

# 导入 MCP 客户端会话和 Stdio 服务器参数配置
from mcp import ClientSession, StdioServerParameters
# 导入 stdio 客户端,用于通过标准输入输出与服务通信
from mcp.client.stdio import stdio_client

# 设定服务器文件为当前目录下的 weather-server.py
SERVER_FILE = Path(__file__).with_name("weather-server.py")
# 设置查询天气的目的地,这里设为“北京”
DESTINATION = "北京"
# 设置要查询的天数,这里设为 3 天
DAYS = 3
# 设置用于查询百度地图天气的 API Key
BAIDU_MAP_AK = "51wJobm0zfEyyZv1FGSAGmvrBAXGrqU9"
# 设置 DeepSeek 的 API Key,供大模型使用
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"
# DeepSeek 的 API 基础地址
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
# DeepSeek 所用模型的名称
DEEPSEEK_MODEL = "deepseek-chat"

# 定义一个异步主流程函数,组织并运行天气查询完整流程
async def run():
    # 配置并创建标准输入输出服务器参数,包括要运行的 Python 文件和必要的环境变量
    server = StdioServerParameters(
        command=sys.executable,
        args=[str(SERVER_FILE)],
        env={
            "BAIDU_MAP_AK": BAIDU_MAP_AK,
            "DEEPSEEK_API_KEY": DEEPSEEK_API_KEY,
            "DEEPSEEK_BASE_URL": DEEPSEEK_BASE_URL,
            "DEEPSEEK_MODEL": DEEPSEEK_MODEL,
        },
    )
    # 启动 stdio 客户端,与服务器建立异步通道获取读写对象
    async with stdio_client(server) as (read, write):
        # 用读写通道实例化 MCP 客户端会话
        async with ClientSession(read, write) as session:
            # 初始化 MCP 会话
            await session.initialize()
            # 调用服务器端的 get_travel_forecast 工具,传入目的地和天数参数
            result = await session.call_tool(
                "get_travel_forecast",
                {"destination": DESTINATION, "days": DAYS},
            )
            # 若接口返回错误,则抛出异常并输出错误内容
            if result.isError:
                raise RuntimeError(str(result.content))
            # 正常返回则遍历所有结果,并逐条输出天气预报文本
            for item in result.content:
                text = getattr(item, "text", "")
                if text:
                    print(text)

# 定义入口主函数
def main():
    # 启动异步事件循环,执行主流程
    asyncio.run(run())

# 检查是否为主程序入口,如果是则调用主函数
if __name__ == "__main__":
    main()

2.2. weather-server.py #

weather-server.py

# 天气 MCP 服务器(stdio):目的地 + N 天 -> DeepSeek 参数抽取 -> 百度天气预报。
"""
天气 MCP 服务器(stdio):目的地 + N 天 -> DeepSeek 参数抽取 -> 百度天气预报。
"""

# 导入 json 库用于处理 JSON 数据
import json
# 导入 logging 库用于日志记录
import logging
# 导入 os 库用于环境变量读取
import os
# 导入 Annotated 类型用于参数注解
from typing import Annotated

# 导入 httpx 异步 HTTP 客户端
import httpx
# 从 langchain_core 库导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser
# 从 langchain_core 库导入聊天消息模板
from langchain_core.prompts import ChatPromptTemplate
# 导入 DeepSeek 聊天模型类
from langchain_deepseek import ChatDeepSeek
# 导入 FastMCP 快速 MCP 服务框架
from mcp.server.fastmcp import FastMCP
# 从 pydantic 导入字段描述工具
from pydantic import Field

# 配置日志格式和输出级别
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
# 创建 logger 对象
logger = logging.getLogger("weather-service")
# 初始化 FastMCP 服务,服务名为“天气查询服务”
mcp = FastMCP("天气查询服务")
# 百度天气接口的访问 URL
WEATHER_V1_URL = "https://api.map.baidu.com/weather/v1/"

# 获取百度地图 AK(密钥),不存在时报错
def _ak():
    v = (os.environ.get("BAIDU_MAP_AK") or "").strip()
    if not v:
        raise RuntimeError("缺少环境变量 BAIDU_MAP_AK")
    return v

# 获取 DeepSeek 聊天模型接口
def _deepseek():
    key = (os.environ.get("DEEPSEEK_API_KEY") or "").strip()
    if not key:
        raise RuntimeError("缺少环境变量 DEEPSEEK_API_KEY")
    return ChatDeepSeek(
        api_key=key,
        base_url=(
            os.environ.get("DEEPSEEK_BASE_URL") or "https://api.deepseek.com"
        ).strip(),
        model=(os.environ.get("DEEPSEEK_MODEL") or "deepseek-chat").strip(),
        temperature=0,
    )

# 使用大模型从 destination 文本中抽取百度天气查询参数
async def _extract_params(destination):
    # 日志记录参数抽取开始
    logger.info("开始参数抽取,destination=%s", destination)
    # 创建 JSON 输出解析器
    parser = JsonOutputParser()
    # 创建提问模板用于 LLM 参数抽取
    prompt = ChatPromptTemplate.from_template("""
        从输入中抽取百度天气接口参数,只输出 JSON:
        {{
        "district_id": "",
        "province": "",
        "city": "",
        "district": ""
        }}
        输入:{destination}
        输出格式要求:{format_instructions}
        """.strip())
    # 组成 prompt、llm、parser 串联调用链
    chain = prompt | _deepseek() | parser
    # 传入用户输入和格式需求,异步请求大模型
    data = await chain.ainvoke(
        {
            "destination": destination,
            "format_instructions": parser.get_format_instructions(),
        }
    )
    # 将模型输出整理成结构化参数
    extracted = {
        "district_id": str(data.get("district_id") or "").strip(),
        "province": str(data.get("province") or "").strip(),
        "city": str(data.get("city") or "").strip(),
        "district": str(data.get("district") or "").strip(),
    }
    # 日志记录提取结果
    logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
    # 返回抽取参数
    return extracted

# 获取天气预报数据(根据从 LLM 得到的地点参数调用百度天气 API)
async def _fetch_forecast(params_from_llm):
    # 准备基础参数
    params = {"data_type": "fc", "output": "json", "ak": _ak()}
    # 按优先级判断地点参数,设置百度接口调用参数
    if params_from_llm["district_id"]:
        params["district_id"] = params_from_llm["district_id"]
    elif params_from_llm["district"]:
        params["district"] = params_from_llm["district"]
        if params_from_llm["province"]:
            params["province"] = params_from_llm["province"]
        if params_from_llm["city"]:
            params["city"] = params_from_llm["city"]
    elif params_from_llm["city"]:
        params["district"] = params_from_llm["city"]
        if params_from_llm["province"]:
            params["province"] = params_from_llm["province"]
    elif params_from_llm["province"]:
        params["district"] = params_from_llm["province"]
    else:
        # 地点参数缺失时报错
        raise RuntimeError("LLM 未提取出有效地点(district_id/district/city/province)")
    # 日志记录 API 调用参数
    logger.info("调用百度天气接口,params=%s", json.dumps(params, ensure_ascii=False))
    # 使用 httpx 异步请求百度天气 API
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(WEATHER_V1_URL, params=params)
        r.raise_for_status()
        data = r.json()
    # 判断百度天气接口返回状态,非 0 抛异常
    if data.get("status") != 0:
        raise RuntimeError(
            f"百度天气接口失败: status={data.get('status')} message={data.get('message')}"
        )
    # 日志记录成功
    logger.info("百度天气接口调用成功")
    # 返回结果字段(天气数据)
    return data.get("result") or {}

# 注册 MCP 工具方法,用于获取旅行天气预报
@mcp.tool()
async def get_travel_forecast(
    destination: Annotated[
        str,
        Field(description="旅游目的地,例如:吉林省延边朝鲜族自治州龙井市"),
    ],
    days: Annotated[
        int,
        Field(description="展示未来天气天数,范围 1~15,默认 3"),
    ] = 3,
):
    """天气预告服务器"""
    # days 限制范围在 1~15 天
    days = max(1, min(int(days), 15))
    # 调用大模型参数抽取
    params_from_llm = await _extract_params(destination)
    # 调用百度天气获取结果
    result = await _fetch_forecast(params_from_llm)
    # 取得每日预报列表
    forecasts = list(result.get("forecasts") or [])
    # 日志记录生成天气预报
    logger.info("生成天气预报结果,destination=%s days=%s", destination, days)
    # 构建输出文本行
    lines = [
        f"输入目的地:{destination}",
        f"参数:{json.dumps(params_from_llm, ensure_ascii=False)}",
        f"展示天数:{min(days, len(forecasts))}",
        "",
    ]
    # 遍历每天的预报信息,累加到输出内容
    for day in forecasts[:days]:
        lines.append(f"{day.get('date', '')} {day.get('week', '')}")
        lines.append(
            f"白天:{day.get('text_day', '')},夜间:{day.get('text_night', '')}"
        )
        lines.append(f"温度:{day.get('low', '')}~{day.get('high', '')}℃")
        lines.append("")
    # 返回拼接后的天气预报文本
    return "\n".join(lines)

# 主程序入口,启动 MCP 服务(stdio 模式)
def main():
    mcp.run(transport="stdio")

# 如果当前模块为主程序,则执行 main()
if __name__ == "__main__":
    main()

2.3 执行过程 #

sequenceDiagram autonumber participant Client as MCP客户端 participant Server as weather-server.py participant DS as DeepSeek API participant BD as 百度天气API Client->>Server: 启动服务进程 Server->>Server: main() -> mcp.run(transport="stdio") Client->>Server: 调用 get_travel_forecast(destination, days) Server->>Server: days 约束到 [1,15] Server->>Server: _extract_params(destination) Server->>Server: _deepseek() 读取 DEEPSEEK_* 环境变量 Server->>DS: chain.ainvoke(prompt+format_instructions) DS-->>Server: 返回 JSON 参数(district_id/province/city/district) Server->>Server: _fetch_forecast(params_from_llm) Server->>Server: _ak() 读取 BAIDU_MAP_AK Server->>Server: 按优先级组装百度请求参数 Server->>BD: GET /weather/v1 (data_type=fc, output=json, ...) BD-->>Server: 天气 JSON(status/result/forecasts) alt status != 0 或 HTTP错误 Server-->>Client: 抛出 RuntimeError else 成功 Server->>Server: 组装多行天气文本 Server-->>Client: 返回 forecast 文本 end

3.地点搜索 #

本节我们将扩展基于MCP协议的能力,介绍“地点搜索”服务的架构与使用方法。
通过MCP,地点搜索可以作为一个独立、解耦的微服务模块运行,分别通过HTTP(SSE流式)协议实现异步数据交互。
主流程通常包括以下环节:

  • 客户端通过SSE方式连接本地place服务容器(即启动place MCP服务器),并传递查询关键词、分页等参数;
  • 服务器端解析请求参数,读取百度地图和DeepSeek相关环境变量,实现灵活的地图/LLM能力组合;
  • 服务器内部自动调用百度地图地点搜索API,将返回的JSON结构化解析,按需裁剪、重组为MCP标准数据格式;
  • 结果通过SSE通道按块逐步推送到客户端,适合大规模结果集或低延迟场景;
  • 客户端可实时获取、展示景点、地标等地点信息,也可结合更多模块进行多步联动(如路线规划、天气查询等)。

3.1. place-client.py #

place-client.py

# 调试用的描述字符串,说明本文件功能:通过SSE方式连接place MCP服务器并调用search_place工具
"""调试用:连接 place SSE MCP 并调用 search_place。"""

# 导入异步操作相关库
import asyncio
# 导入日志处理相关库
import logging

# 从mcp包中导入客户端会话类
from mcp import ClientSession
# 从mcp客户端SSE模块导入sse客户端工厂函数
from mcp.client.sse import sse_client

# 配置日志输出格式与等级为INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为"place-client"的日志记录器
logger = logging.getLogger("place-client")

# 定义SSE服务URL(本地服务器地址)
SSE_URL = "http://127.0.0.1:8001/sse"
# 设置百度地图AK密钥
BAIDU_MAP_AK = "e0QsxCTdlt6qPNoQQNJwa89qoJ4OXieX"
# 设置DeepSeek的API Key
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"


# 定义异步主操作函数
async def run():
    # 日志提示正在连接SSE服务
    logger.info("连接 SSE 服务:%s", SSE_URL)
    # 构造请求头,带有百度和DeepSeek的API密钥
    headers = {"BAIDU_MAP_AK": BAIDU_MAP_AK, "DEEPSEEK_API_KEY": DEEPSEEK_API_KEY}
    # 以异步方式连接SSE客户端,获取读写对象
    async with sse_client(SSE_URL, headers=headers) as (read, write):
        # 以ClientSession包装读写对象,建立MCP协议会话
        async with ClientSession(read, write) as session:
            # 初始化MCP会话
            await session.initialize()
            # 日志记录会话初始化成功
            logger.info("MCP 会话初始化成功")

            # 日志提示即将查询景点
            logger.info("调用工具 search_place(景点)")
            # 向服务端调用search_place,参数为“北京市 景点”、分页等
            scenic = await session.call_tool(
                "search_place",
                {"user_input": "北京市 景点", "page_size": 5, "page_num": 0, "with_detail": False},
            )
            # 若返回出错,抛出异常输出错误内容
            if scenic.isError:
                raise RuntimeError(str(scenic.content))
            # 日志记录查询景点成功
            logger.info("景点查询成功")
            # 遍历返回的景点内容,逐条提取并输出结果内容
            for item in scenic.content:
                text = getattr(item, "text", "")
                if text:
                    logger.info("景点结果:\n%s", text)

            # 日志提示即将查询酒店
            logger.info("调用工具 search_place(酒店)")
            # 向服务端调用search_place,参数为“北京市 酒店”、分页等
            hotel = await session.call_tool(
                "search_place",
                {"user_input": "北京市 酒店", "page_size": 5, "page_num": 0, "with_detail": True},
            )
            # 若返回出错,抛出异常输出错误内容
            if hotel.isError:
                raise RuntimeError(str(hotel.content))
            # 日志记录查询酒店成功
            logger.info("酒店查询成功")
            # 遍历返回的酒店内容,逐条提取并输出结果内容
            for item in hotel.content:
                text = getattr(item, "text", "")
                if text:
                    logger.info("酒店结果:\n%s", text)


# 定义主函数入口
def main():
    # 启动异步主流程
    asyncio.run(run())


# 如果本文件直接运行,则执行main入口函数
if __name__ == "__main__":
    main()

3.2. place-server.py #

place-server.py

# 地点检索 MCP 服务器(SSE)
"""地点检索 MCP 服务器(SSE)。"""

# 导入标准库模块
import json  # 导入json处理模块
import logging  # 导入日志模块
import os  # 导入操作系统相关模块
from typing import Annotated  # 导入类型注解模块

# 导入第三方模块
import httpx  # 导入http异步请求库
from langchain_core.output_parsers import JsonOutputParser  # 导入LangChain的JSON输出解析器
from langchain_core.prompts import ChatPromptTemplate  # 导入LangChain的Prompt模板
from langchain_deepseek import ChatDeepSeek  # 导入DeepSeek聊天模块
from mcp.server.fastmcp import Context, FastMCP  # 导入FastMCP框架相关类
from pydantic import Field  # 导入Pydantic的Field用于参数描述

# 配置日志格式和日志级别为INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为"place-service"的日志记录器
logger = logging.getLogger("place-service")

# 设置主机地址
HOST = "127.0.0.1"
# 设置服务端口
PORT = 8001
# 设置SSE路径
SSE_PATH = "/sse"
# 初始化一个FastMCP实例,名称为"地点检索服务"
mcp = FastMCP("地点检索服务", host=HOST, port=PORT, sse_path=SSE_PATH)

# 定义百度地图检索相关API地址
PLACE_SEARCH_URL = "https://api.map.baidu.com/place/v2/search"  # 地点检索接口
PLACE_DETAIL_URL = "https://api.map.baidu.com/place/v2/detail"  # 地点详情接口
PLACE_SUGGESTION_URL = "https://api.map.baidu.com/place/v2/suggestion"  # 地点建议接口


# 从请求上下文中获取指定请求头的值
def _header_value(ctx, key):
    request = None  # 初始化request为None
    # 如果上下文和request_context都存在,则获取request对象
    if ctx and ctx.request_context:
        request = getattr(ctx.request_context, "request", None)
    # 若找不到request则抛出异常
    if request is None:
        raise RuntimeError(f"缺少请求上下文,无法读取请求头 {key}")
    # 请求头支持大小写
    raw = request.headers.get(key) or request.headers.get(key.lower()) or ""
    v = str(raw).strip()  # 清理空格
    # 若值为空则抛出异常
    if not v:
        raise RuntimeError(f"缺少请求头 {key}")
    return v  # 返回请求头的值

# 从上下文中获取百度AK
def _ak(ctx):
    return _header_value(ctx, "BAIDU_MAP_AK")

# 构造DeepSeek聊天对象
def _deepseek(ctx):
    # 获取DeepSeek API KEY
    key = _header_value(ctx, "DEEPSEEK_API_KEY")
    # 创建ChatDeepSeek对象,支持自定义模型和Base URL
    return ChatDeepSeek(
        api_key=key,
        base_url=(os.environ.get("DEEPSEEK_BASE_URL") or "https://api.deepseek.com").strip(),
        model=(os.environ.get("DEEPSEEK_MODEL") or "deepseek-chat").strip(),
        temperature=0,
    )

# 利用LLM自动抽取地点检索参数
async def _extract_place_args(user_input, ctx):
    # 记录日志,标记开始参数抽取
    logger.info("开始抽取地点检索参数,input=%s", user_input)
    # 创建JSON格式解析器
    parser = JsonOutputParser()
    # 创建用于抽取的Prompt模板
    prompt = ChatPromptTemplate.from_template(
        """
        从用户输入中抽取百度地点检索参数,只输出 JSON:
        {{
        "region": "",
        "query": "",
        "tag": "",
        "city_limit": false
        }}
        规则:
        1) region 必须是地区名(市/州/区县),如“延边朝鲜族自治州”。
        2) query 填检索词,如“景点”“酒店”“火锅”等。
        3) tag 可选(如“旅游景点”“酒店”),无法确定可空字符串。
        4) city_limit 仅当用户强调“仅本地区”时设为 true。
        输入:{user_input}
        输出格式要求:{format_instructions}
        """.strip()
    )
    # 组合Prompt、LLM和解析器,构造Chain链式调用
    chain = prompt | _deepseek(ctx) | parser
    # 异步执行链式调用,获得LLM抽取的参数
    data = await chain.ainvoke(
        {"user_input": user_input, "format_instructions": parser.get_format_instructions()}
    )
    # 整理参数,去除两端空白,保证类型安全
    extracted = {
        "region": str(data.get("region") or "").strip(),
        "query": str(data.get("query") or "").strip(),
        "tag": str(data.get("tag") or "").strip(),
        "city_limit": bool(data.get("city_limit") or False),
    }
    # 打印参数抽取结果日志
    logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
    return extracted  # 返回抽取到的参数字典

# 根据参数请求百度地点检索服务
async def _search_places(params_from_llm, page_size, page_num, ctx):
    # 组装百度API请求参数
    req = {
        "query": params_from_llm["query"] or "景点",  # 未识别query默认为景点
        "region": params_from_llm["region"],
        "tag": params_from_llm["tag"],
        "city_limit": "true" if params_from_llm["city_limit"] else "false",
        "output": "json",
        "scope": "2",
        "page_size": str(page_size),
        "page_num": str(page_num),
        "ak": _ak(ctx),
    }
    # 若缺少region,无法查询,报错
    if not req["region"]:
        raise RuntimeError("LLM 未提取出 region,无法检索地点")

    # 打印请求参数日志
    logger.info("调用地点检索接口,params=%s", json.dumps(req, ensure_ascii=False))
    # 使用httpx发送异步GET请求
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(PLACE_SEARCH_URL, params=req)
        r.raise_for_status()
        data = r.json()
    # 若接口返回状态非0,则抛出异常
    if data.get("status") != 0:
        raise RuntimeError(f"地点检索失败: status={data.get('status')} message={data.get('message')}")
    return data  # 返回百度API响应结果

# 通过UID获取百度地点详情
async def _detail_by_uid(uid, ctx):
    # 组装请求参数
    req = {"uid": uid, "scope": "2", "output": "json", "ak": _ak(ctx)}
    # 使用httpx发送异步请求
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(PLACE_DETAIL_URL, params=req)
        r.raise_for_status()
        data = r.json()
    # 如果状态不等于0,返回空字典
    if data.get("status") != 0:
        return {}
    # 返回地点详细信息的result部分
    return data.get("result") or {}

# 获得关键词/区域自动补全建议
async def _suggest_region_keyword(region, query, ctx):
    # 构造建议接口请求参数
    req = {
        "query": query or region,
        "region": region or "全国",
        "city_limit": "false",
        "output": "json",
        "ak": _ak(ctx),
    }
    # 使用httpx异步请求suggestion接口
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(PLACE_SUGGESTION_URL, params=req)
        r.raise_for_status()
        data = r.json()
    # 状态失败返回空列表
    if data.get("status") != 0:
        return []
    # 返回建议列表
    return list(data.get("results") or [])

# 定义MCP服务的search_place工具
@mcp.tool()
async def search_place(
    user_input: Annotated[
        str,
        Field(description="用户输入的地点检索需求,例如:北京 景点"),
    ],
    page_size: Annotated[
        int,
        Field(description="每页数量,范围 1~20,默认 5"),
    ] = 5,
    page_num: Annotated[
        int,
        Field(description="页码,从 0 开始,默认 0"),
    ] = 0,
    with_detail: Annotated[
        bool,
        Field(description="是否补充地点详情信息(电话、人均、营业时间)"),
    ] = False,
    ctx: Context = None,
):
    # 工具函数说明
    """地点检索 MCP 服务。"""
    # 限定page_size取值范围在1~20之间
    page_size = max(1, min(int(page_size), 20))
    # 限定page_num为非负整数
    page_num = max(0, int(page_num))
    # 将with_detail类型转换为bool
    with_detail = bool(with_detail)

    # 第一步:通过LLM抽取检索参数
    params_from_llm = await _extract_place_args(user_input, ctx)
    # 第二步:通过API获取检索结果
    data = await _search_places(params_from_llm, page_size, page_num, ctx)
    # 第三步:得到检索结果与总数
    results = list(data.get("results") or [])
    total = int(data.get("total") or 0)

    # 第四步:构造返回文本的前缀内容
    lines = [
        f"输入:{user_input}",
        f"参数:{json.dumps(params_from_llm, ensure_ascii=False)}",
        f"总数:{total},当前页数量:{len(results)}",
        "",
    ]

    # 如果无结果,获取推荐候选词
    if not results:
        suggestions = await _suggest_region_keyword(params_from_llm["region"], params_from_llm["query"], ctx)
        lines.append("未检索到结果。可参考候选词:")
        # 枚举前5个候选词输出
        for i, it in enumerate(suggestions[:5], 1):
            lines.append(f"{i}. {it.get('name', '')} {it.get('city', '')}{it.get('district', '')}")
        return "\n".join(lines)  # 返回结果字符串

    # 对返回的每条结果进行格式化输出
    for i, item in enumerate(results, 1):
        lines.append(f"{i}. {item.get('name', '')}")
        lines.append(f"   地址:{item.get('address', '未知')}")
        lines.append(f"   区域:{item.get('province', '')}{item.get('city', '')}{item.get('area', '')}")
        lines.append(f"   评分:{item.get('detail_info', {}).get('overall_rating', '无')}")
        lines.append(f"   类型:{item.get('detail_info', {}).get('tag', '无')}")
        # 如需补充详细信息
        if with_detail:
            uid = str(item.get("uid") or "").strip()
            # 存在UID才查详情
            if uid:
                detail = await _detail_by_uid(uid, ctx)
                d = detail.get("detail_info") or {}
                lines.append(f"   电话:{detail.get('telephone', '无')}")
                lines.append(f"   人均:{d.get('price', '无')}")
                lines.append(f"   营业时间:{d.get('shop_hours', '无')}")
        lines.append("")

    # 合并全部文本并返回
    return "\n".join(lines)

# 主程序入口
def main():
    # 打印服务启动日志
    logger.info("启动 Place MCP SSE 服务: http://%s:%s%s", HOST, PORT, SSE_PATH)
    # 启动MCP服务,使用SSE协议
    mcp.run(transport="sse")

# 如果是主程序入口,则执行main函数
if __name__ == "__main__":
    main()

3.3 执行过程 #

sequenceDiagram autonumber participant Client as MCP客户端 participant Server as place-server.py participant DS as DeepSeek API participant BS as 百度 Place Search participant BD as 百度 Place Detail participant BG as 百度 Suggestion Client->>Server: 启动服务进程 Server->>Server: main() -> mcp.run(transport="sse") Client->>Server: 调用 search_place(user_input, page_size, page_num, with_detail, headers) Server->>Server: 参数裁剪与类型规范化 Server->>Server: 从 ctx.headers 读取 DEEPSEEK_API_KEY / BAIDU_MAP_AK Server->>DS: _extract_place_args(user_input) DS-->>Server: {region, query, tag, city_limit} Server->>BS: _search_places(params, page_size, page_num) BS-->>Server: results, total, status alt results 为空 Server->>BG: _suggest_region_keyword(region, query) BG-->>Server: suggestions[] Server-->>Client: 返回“无结果 + 候选词”文本 else results 非空 loop 每个 result alt with_detail == true 且存在 uid Server->>BD: _detail_by_uid(uid) BD-->>Server: detail_info end end Server-->>Client: 返回格式化地点列表文本 end

4. 路径规划 #

在本部分,我们将介绍路径规划的整体架构和工作流程。路径规划模块负责根据用户输入的起点、终点(可选途经点)和交通方式(如驾车、步行、骑行、公交等),请求第三方地图 API 获取推荐路线,并将结果结构化后返回。

整体流程如下:

  1. MCP 服务器通过对应的 API(如 plan_route)接收来自客户端的路径规划请求。
  2. 服务器会从请求体或 headers 中提取必要参数(如起点、终点、交通工具类型、API Key 等),并进行参数校验。
  3. 根据请求的交通方式和参数,服务器调用百度地图/高德等第三方的路径规划 API,并对返回结果进行容错、重试和数据解析。
  4. 若参数有误或第三方接口返回失败,服务器会格式化错误提示返回给客户端。
  5. 若成功获取路径结果,服务器会将路线(包括距离、预计耗时、路线步骤等)结构化封装后返回给客户端。
  6. 客户端收到格式化结果后可进行展示或进一步操作。

4.1. direction-client.py #

direction-client.py

# 调试用描述:连接 direction Streamable HTTP MCP 并调用 plan_route。
"""调试用:连接 direction Streamable HTTP MCP 并调用 plan_route。"""

# 导入异步相关库
import asyncio
# 导入日志模块
import logging

# 导入 httpx 库用于 HTTP 通信
import httpx
# 从 mcp 包中导入客户端会话类
from mcp import ClientSession
# 从 mcp.client.streamable_http 模块导入客户端工厂
from mcp.client.streamable_http import streamable_http_client

# 配置日志的输出格式和等级为 INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为 "direction-client" 的日志记录器
logger = logging.getLogger("direction-client")

# 定义 MCP 服务的 HTTP URL
HTTP_URL = "http://127.0.0.1:8002/mcp"
# 设置百度地图 AK 密钥
BAIDU_MAP_AK = "e0QsxCTdlt6qPNoQQNJwa89qoJ4OXieX"
# 设置 DeepSeek API Key
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"


# 定义主异步运行函数
async def run():
    # 日志输出:正在连接 Streamable HTTP 服务
    logger.info("连接 Streamable HTTP 服务:%s", HTTP_URL)
    # 构造请求头,携带百度地图和 DeepSeek 的 API 密钥
    headers = {"BAIDU_MAP_AK": BAIDU_MAP_AK, "DEEPSEEK_API_KEY": DEEPSEEK_API_KEY}
    # 创建 httpx 的异步客户端,请求超时时间为 30 秒
    async with httpx.AsyncClient(headers=headers, timeout=30.0) as http_client:
        # 使用工厂方法连接至 Streamable HTTP MCP,并获取读写对象
        async with streamable_http_client(HTTP_URL, http_client=http_client) as (read, write, _):
            # 创建 MCP 会话,包装读写对象
            async with ClientSession(read, write) as session:
                # 初始化 MCP 会话
                await session.initialize()
                # 日志输出会话初始化成功
                logger.info("MCP 会话初始化成功")

                # 日志输出:调用 plan_route(驾车)
                logger.info("调用 plan_route(驾车)")
                # 调用工具 plan_route,查询从北京市海淀区到天津市滨海新区的驾车路线,尽量不走高速
                driving = await session.call_tool(
                    "plan_route",
                    {"user_input": "从北京市海淀区到天津市滨海新区驾车,尽量不走高速", "max_steps": 6},
                )
                # 遍历驾车路线的结果内容
                for item in driving.content:
                    # 获取结果的 text 字段
                    text = getattr(item, "text", "")
                    # 若存在文本内容,日志输出驾车结果
                    if text:
                        logger.info("驾车结果:\n%s", text)

                # 日志输出:调用 plan_route(公共交通)
                logger.info("调用 plan_route(公共交通)")
                # 调用工具 plan_route,查询从北京市朝阳区到上海市浦东新区的公共交通路线,优先选择火车
                transit = await session.call_tool(
                    "plan_route",
                    {"user_input": "从北京市朝阳区到上海市浦东新区公共交通,优先火车", "max_steps": 6},
                )
                # 遍历公共交通路线的结果内容
                for item in transit.content:
                    # 获取结果的 text 字段
                    text = getattr(item, "text", "")
                    # 若存在文本内容,日志输出公共交通结果
                    if text:
                        logger.info("公共交通结果:\n%s", text)


# 定义主入口函数
def main():
    # 运行主异步函数
    asyncio.run(run())


# 判断当前模块是否为主程序入口
if __name__ == "__main__":
    # 如果是主程序则执行 main 函数
    main()

4.2. direction-server.py #

direction-server.py

# 路线规划 MCP 服务器(Streamable HTTP)。
"""路线规划 MCP 服务器(Streamable HTTP)。"""

# 导入标准库
import json  # 导入 JSON 模块,用于序列化和反序列化
import logging  # 导入日志模块,用于日志输出
from typing import Annotated  # 导入类型注解

# 导入第三方库和自定义依赖
import httpx  # 导入 httpx,用于异步 HTTP 请求
from langchain_core.output_parsers import JsonOutputParser  # 导入 LangChain 的 JSON 输出解析器
from langchain_core.prompts import ChatPromptTemplate  # 导入 LangChain 的聊天提示模板
from langchain_deepseek import ChatDeepSeek  # 导入 DeepSeek 聊天模型
from mcp.server.fastmcp import Context, FastMCP  # 导入自定义的 MCP 相关依赖
from pydantic import Field  # 导入 Pydantic 的 Field 用于模型字段描述

# 配置日志格式和等级
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取 direction-service 的日志记录器
logger = logging.getLogger("direction-service")

# 定义主机地址和服务端口
HOST = "127.0.0.1"
PORT = 8002
# 定义 HTTP 路径
HTTP_PATH = "/mcp"
# 实例化 MCP 服务
mcp = FastMCP("路线规划服务", host=HOST, port=PORT, streamable_http_path=HTTP_PATH)

# 百度地理编码 API 地址
GEOCODE_URL = "https://api.map.baidu.com/geocoding/v3/"
# 百度路线规划 API 基础地址
ROUTE_V2_BASE = "https://api.map.baidu.com/direction/v2"
# DeepSeek 相关常量
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
DEEPSEEK_MODEL = "deepseek-chat"


# 从请求上下文中读取请求头
def _header_value(ctx, key):
    request = None  # 初始化请求对象为 None
    # 获取请求对象
    if ctx and ctx.request_context:
        request = getattr(ctx.request_context, "request", None)
    # 如果请求不存在则报错
    if request is None:
        raise RuntimeError(f"缺少请求上下文,无法读取请求头 {key}")
    # 获取请求头内容,支持大小写
    raw = request.headers.get(key) or request.headers.get(key.lower()) or ""
    v = str(raw).strip()  # 去除空格转换为字符串
    # 若头部内容为空则报错
    if not v:
        raise RuntimeError(f"缺少请求头 {key}")
    return v  # 返回头部的值


# 获取百度地图 AK
def _ak(ctx):
    return _header_value(ctx, "BAIDU_MAP_AK")  # 调用 _header_value 获取百度地图 AK


# 构建 DeepSeek 对象
def _deepseek(ctx):
    key = _header_value(ctx, "DEEPSEEK_API_KEY")  # 获取 DeepSeek 的 API KEY
    return ChatDeepSeek(
        api_key=key,
        base_url=DEEPSEEK_BASE_URL,
        model=DEEPSEEK_MODEL,
        temperature=0,
    )  # 返回 DeepSeek 聊天对象

# 利用 LLM 从用户输入中抽取路线参数
async def _extract_route_args(user_input, ctx):
    # 日志:开始抽取参数
    logger.info("开始抽取路线参数,input=%s", user_input)
    # 实例化 JSON 输出解析器
    parser = JsonOutputParser()
    # 构建 LLM 提示模板
    prompt = ChatPromptTemplate.from_template(
        """
        从输入中抽取百度路线规划参数,只输出 JSON:
        {{
        "origin": "",
        "destination": "",
        "mode": "driving",
        "tactics": ""
        }}
        约束:
        1) mode 只能是 driving / transit。
        2) 如果用户提到“公共交通/地铁/公交/高铁/动车/火车/飞机”,mode 设为 transit。
        3) tactics 可选(仅 driving 生效,如 “avoid_highway”),否则空字符串。
        输入:{user_input}
        输出格式要求:{format_instructions}
        """.strip()
    )
    # 串联提示、模型、JSON 解析器
    chain = prompt | _deepseek(ctx) | parser
    # 异步推理
    data = await chain.ainvoke(
        {"user_input": user_input, "format_instructions": parser.get_format_instructions()}
    )
    # 国际化 mode 字段,保证仅支持 driving/transit
    mode = str(data.get("mode") or "driving").strip().lower()
    if mode not in {"driving", "transit"}:
        mode = "driving"
    # 组织抽取结果为字典
    extracted = {
        "origin": str(data.get("origin") or "").strip(),
        "destination": str(data.get("destination") or "").strip(),
        "mode": mode,
        "tactics": str(data.get("tactics") or "").strip(),
    }
    # 输出参数抽取日志
    logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
    return extracted  # 返回参数抽取结果

# 地址地理编码(调用百度 API,返回经纬度及格式化地址)
async def _geocode(address, ctx):
    req = {"address": address, "output": "json", "ak": _ak(ctx)}  # 构造请求参数
    # 新建 AsyncClient 请求百度地理编码接口
    async with httpx.AsyncClient(timeout=20.0) as client:
        r = await client.get(GEOCODE_URL, params=req)  # 发送 GET 请求
        r.raise_for_status()  # 检查响应码
        data = r.json()  # 解析响应 JSON
    # 若API状态非0抛异常
    if data.get("status") != 0:
        raise RuntimeError(f"地理编码失败: {address}, status={data.get('status')}")
    # 获取地理编码结果
    result = data.get("result") or {}
    loc = result.get("location") or {}
    lng, lat = loc.get("lng"), loc.get("lat")  # 提取经纬度
    # 若坐标不存在抛异常
    if lng is None or lat is None:
        raise RuntimeError(f"地理编码失败: {address}, 无坐标")
    # 返回“纬度,经度”和格式化地址
    return f"{lat},{lng}", result.get("formatted_address") or address

# 秒转化为“xx小时xx分钟”或“xx分钟”
def _seconds_to_hhmm(sec):
    sec = int(sec or 0)  # 转换为整数,若为 None 置为 0
    h = sec // 3600  # 小时数
    m = (sec % 3600) // 60  # 分钟数
    if h > 0:
        return f"{h}小时{m}分钟"  # 返回“xh ym”
    return f"{m}分钟"  # 返回“ym”

# 解析单步路线描述
def _step_text(step):
    # 依次获取多种描述字段
    text = str(
        step.get("instruction")
        or step.get("instructions")
        or step.get("html_instructions")
        or ""
    ).strip()
    # 若存在纯文本描述,移除<b>标签后直接返回
    if text:
        return text.replace("<b>", "").replace("</b>", "")
    # 否则拼接路名、距离和时长
    road = str(step.get("road_name") or "道路").strip()
    dist = int(step.get("distance") or 0)
    dur = int(step.get("duration") or 0)
    if dist > 0:
        return f"沿{road}行驶约{dist}米,预计{_seconds_to_hhmm(dur)}"
    return f"沿{road}行驶"

# 查询路线(调用百度路线 API)
async def _query_route(route_args, ctx):
    # 获取起点和终点的坐标及格式化地址
    origin_ll, origin_fmt = await _geocode(route_args["origin"], ctx)
    dest_ll, dest_fmt = await _geocode(route_args["destination"], ctx)
    # 获取出行方式
    mode = route_args["mode"]
    # 构造 API 端点
    endpoint = f"{ROUTE_V2_BASE}/{mode}"

    # 组装接口请求参数
    req = {
        "origin": origin_ll,
        "destination": dest_ll,
        "ak": _ak(ctx),
    }
    # 如为驾车则加入策略参数
    if mode == "driving":
        req["tactics"] = "11" if route_args["tactics"] == "avoid_highway" else "0"

    # 日志:调用路线接口
    logger.info("调用路线接口,mode=%s params=%s", mode, json.dumps(req, ensure_ascii=False))
    # 新建 AsyncClient 请求路线接口
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(endpoint, params=req)  # 向百度接口发送请求
        r.raise_for_status()  # 检查 HTTP 状态码
        data = r.json()  # 解析 JSON 响应
    # 若API返回失败则抛异常
    if data.get("status") != 0:
        raise RuntimeError(f"路线规划失败: status={data.get('status')} message={data.get('message')}")
    # 获取路线结果
    result = data.get("result") or {}
    routes = list(result.get("routes") or [])
    # 若无可用路线则抛异常
    if not routes:
        raise RuntimeError("路线规划失败: 无可用路线")
    # 选择第一条最佳路线
    best = routes[0]
    # 返回所需结构体
    return {
        "origin": origin_fmt,
        "destination": dest_fmt,
        "mode": mode,
        "distance": int(best.get("distance") or 0),
        "duration": int(best.get("duration") or 0),
        "steps": best.get("steps") or [],
    }

# MCP 工具:主路线规划接口
@mcp.tool()
async def plan_route(
    user_input: Annotated[str, Field(description="用户输入的路线规划需求,例如:从北京市海淀区到天津市滨海新区驾车,尽量不走高速")],
    max_steps: Annotated[int, Field(description="最大步数,范围 1~20,默认 6")] = 6,
    ctx: Annotated[Context, Field(description="上下文")] = None,
):
    # 路线规划服务说明
    """路线规划服务。"""
    # 抽取参数
    route_args = await _extract_route_args(user_input, ctx)
    # 查询路线
    data = await _query_route(route_args, ctx)
    # 步数限幅在1~20
    max_steps = max(1, min(int(max_steps), 20))

    # 组织输出文本(前缀:输入、参数、出发地、目的地、方式、里程、时长)
    lines = [
        f"输入:{user_input}",
        f"参数:{json.dumps(route_args, ensure_ascii=False)}",
        f"出发地:{data['origin']}",
        f"目的地:{data['destination']}",
        f"方式:{data['mode']}",
        f"总里程:{round(data['distance'] / 1000, 2)} 公里",
        f"预计时长:{_seconds_to_hhmm(data['duration'])}",
        "",
        "导航摘要:",
    ]
    # 计数器
    shown = 0
    # 遍历每一步提示
    for step in data["steps"]:
        if shown >= max_steps:
            break
        # 若为多子步骤则递归处理
        if isinstance(step, list):
            for s in step:
                if shown >= max_steps:
                    break
                lines.append(f"- {_step_text(s)}")
                shown += 1
        else:
            lines.append(f"- {_step_text(step)}")
            shown += 1

    # 返回合并行(文本)
    return "\n".join(lines)

# 主函数
def main():
    # 日志:启动服务
    logger.info("启动 Direction MCP Streamable HTTP 服务: http://%s:%s%s", HOST, PORT, HTTP_PATH)
    # 运行 MCP 服务
    mcp.run(transport="streamable-http")

# 程序入口
if __name__ == "__main__":
    main()

4.3 执行过程 #

sequenceDiagram autonumber participant Client as MCP客户端 participant Server as direction-server.py participant DS as DeepSeek API participant Geo as 百度地理编码 participant Route as 百度路线规划 Client->>Server: 启动服务 Server->>Server: main() -> mcp.run(streamable-http) Client->>Server: plan_route(user_input, max_steps, headers) Server->>Server: 读取请求头(DEEPSEEK_API_KEY, BAIDU_MAP_AK) Server->>DS: _extract_route_args(user_input) DS-->>Server: {origin, destination, mode, tactics} Server->>Server: 规范 mode ∈ {driving, transit} Server->>Geo: _geocode(origin) Geo-->>Server: origin(lat,lng), formatted_address Server->>Geo: _geocode(destination) Geo-->>Server: destination(lat,lng), formatted_address Server->>Route: _query_route(mode, origin_ll, dest_ll, tactics, ak) Route-->>Server: routes[] alt status!=0 或 routes为空 Server-->>Client: RuntimeError else 成功 Server->>Server: 截取 max_steps 并格式化导航摘要 Server-->>Client: 返回文本路线结果 end
← 上一节 LiteLLM 下一节 pathlib →

访问验证

请输入访问令牌

Token不正确,请重新输入