1. 初始化 #
uv init
uv add mcp[cli]
uv add langchain langchain-core langchain-deepseek langchain-openai2.天气预告 #
本节将基于开源的 MCP(Multi-Component Protocol)体系,构建一个支持天气预报的轻量级服务。
你将看到如何:
- 通过简单的 Python 代码启动和测试一个本地“天气”MCP服务;
- 使用标准输入输出流作为组件间通信的桥梁,实现模块化、解耦的微服务风格开发;
- 利用百度地图接口与大模型服务,提供灵活的智能天气查询能力。
我们将演示如何一键启动本地天气服务容器,自动通过 MCP 协议调用天气获取函数,最终完成“给定目的地的未来3天天气预报”这一任务。
2.1. weather-client.py #
weather-client.py
# 调试用:启动 weather stdio MCP 并调用 get_travel_forecast。
"""调试用:启动 weather stdio MCP 并调用 get_travel_forecast。"""
# 导入异步操作所需的 asyncio 库
import asyncio
# 导入 sys,便于获取 Python 解释器路径
import sys
# 导入 Path,用于方便地处理文件路径
from pathlib import Path
# 导入 MCP 客户端会话和 Stdio 服务器参数配置
from mcp import ClientSession, StdioServerParameters
# 导入 stdio 客户端,用于通过标准输入输出与服务通信
from mcp.client.stdio import stdio_client
# 设定服务器文件为当前目录下的 weather-server.py
SERVER_FILE = Path(__file__).with_name("weather-server.py")
# 设置查询天气的目的地,这里设为“北京”
DESTINATION = "北京"
# 设置要查询的天数,这里设为 3 天
DAYS = 3
# 设置用于查询百度地图天气的 API Key
BAIDU_MAP_AK = "51wJobm0zfEyyZv1FGSAGmvrBAXGrqU9"
# 设置 DeepSeek 的 API Key,供大模型使用
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"
# DeepSeek 的 API 基础地址
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
# DeepSeek 所用模型的名称
DEEPSEEK_MODEL = "deepseek-chat"
# 定义一个异步主流程函数,组织并运行天气查询完整流程
async def run():
# 配置并创建标准输入输出服务器参数,包括要运行的 Python 文件和必要的环境变量
server = StdioServerParameters(
command=sys.executable,
args=[str(SERVER_FILE)],
env={
"BAIDU_MAP_AK": BAIDU_MAP_AK,
"DEEPSEEK_API_KEY": DEEPSEEK_API_KEY,
"DEEPSEEK_BASE_URL": DEEPSEEK_BASE_URL,
"DEEPSEEK_MODEL": DEEPSEEK_MODEL,
},
)
# 启动 stdio 客户端,与服务器建立异步通道获取读写对象
async with stdio_client(server) as (read, write):
# 用读写通道实例化 MCP 客户端会话
async with ClientSession(read, write) as session:
# 初始化 MCP 会话
await session.initialize()
# 调用服务器端的 get_travel_forecast 工具,传入目的地和天数参数
result = await session.call_tool(
"get_travel_forecast",
{"destination": DESTINATION, "days": DAYS},
)
# 若接口返回错误,则抛出异常并输出错误内容
if result.isError:
raise RuntimeError(str(result.content))
# 正常返回则遍历所有结果,并逐条输出天气预报文本
for item in result.content:
text = getattr(item, "text", "")
if text:
print(text)
# 定义入口主函数
def main():
# 启动异步事件循环,执行主流程
asyncio.run(run())
# 检查是否为主程序入口,如果是则调用主函数
if __name__ == "__main__":
main()
2.2. weather-server.py #
weather-server.py
# 天气 MCP 服务器(stdio):目的地 + N 天 -> DeepSeek 参数抽取 -> 百度天气预报。
"""
天气 MCP 服务器(stdio):目的地 + N 天 -> DeepSeek 参数抽取 -> 百度天气预报。
"""
# 导入 json 库用于处理 JSON 数据
import json
# 导入 logging 库用于日志记录
import logging
# 导入 os 库用于环境变量读取
import os
# 导入 Annotated 类型用于参数注解
from typing import Annotated
# 导入 httpx 异步 HTTP 客户端
import httpx
# 从 langchain_core 库导入 JSON 输出解析器
from langchain_core.output_parsers import JsonOutputParser
# 从 langchain_core 库导入聊天消息模板
from langchain_core.prompts import ChatPromptTemplate
# 导入 DeepSeek 聊天模型类
from langchain_deepseek import ChatDeepSeek
# 导入 FastMCP 快速 MCP 服务框架
from mcp.server.fastmcp import FastMCP
# 从 pydantic 导入字段描述工具
from pydantic import Field
# 配置日志格式和输出级别
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
# 创建 logger 对象
logger = logging.getLogger("weather-service")
# 初始化 FastMCP 服务,服务名为“天气查询服务”
mcp = FastMCP("天气查询服务")
# 百度天气接口的访问 URL
WEATHER_V1_URL = "https://api.map.baidu.com/weather/v1/"
# 获取百度地图 AK(密钥),不存在时报错
def _ak():
v = (os.environ.get("BAIDU_MAP_AK") or "").strip()
if not v:
raise RuntimeError("缺少环境变量 BAIDU_MAP_AK")
return v
# 获取 DeepSeek 聊天模型接口
def _deepseek():
key = (os.environ.get("DEEPSEEK_API_KEY") or "").strip()
if not key:
raise RuntimeError("缺少环境变量 DEEPSEEK_API_KEY")
return ChatDeepSeek(
api_key=key,
base_url=(
os.environ.get("DEEPSEEK_BASE_URL") or "https://api.deepseek.com"
).strip(),
model=(os.environ.get("DEEPSEEK_MODEL") or "deepseek-chat").strip(),
temperature=0,
)
# 使用大模型从 destination 文本中抽取百度天气查询参数
async def _extract_params(destination):
# 日志记录参数抽取开始
logger.info("开始参数抽取,destination=%s", destination)
# 创建 JSON 输出解析器
parser = JsonOutputParser()
# 创建提问模板用于 LLM 参数抽取
prompt = ChatPromptTemplate.from_template("""
从输入中抽取百度天气接口参数,只输出 JSON:
{{
"district_id": "",
"province": "",
"city": "",
"district": ""
}}
输入:{destination}
输出格式要求:{format_instructions}
""".strip())
# 组成 prompt、llm、parser 串联调用链
chain = prompt | _deepseek() | parser
# 传入用户输入和格式需求,异步请求大模型
data = await chain.ainvoke(
{
"destination": destination,
"format_instructions": parser.get_format_instructions(),
}
)
# 将模型输出整理成结构化参数
extracted = {
"district_id": str(data.get("district_id") or "").strip(),
"province": str(data.get("province") or "").strip(),
"city": str(data.get("city") or "").strip(),
"district": str(data.get("district") or "").strip(),
}
# 日志记录提取结果
logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
# 返回抽取参数
return extracted
# 获取天气预报数据(根据从 LLM 得到的地点参数调用百度天气 API)
async def _fetch_forecast(params_from_llm):
# 准备基础参数
params = {"data_type": "fc", "output": "json", "ak": _ak()}
# 按优先级判断地点参数,设置百度接口调用参数
if params_from_llm["district_id"]:
params["district_id"] = params_from_llm["district_id"]
elif params_from_llm["district"]:
params["district"] = params_from_llm["district"]
if params_from_llm["province"]:
params["province"] = params_from_llm["province"]
if params_from_llm["city"]:
params["city"] = params_from_llm["city"]
elif params_from_llm["city"]:
params["district"] = params_from_llm["city"]
if params_from_llm["province"]:
params["province"] = params_from_llm["province"]
elif params_from_llm["province"]:
params["district"] = params_from_llm["province"]
else:
# 地点参数缺失时报错
raise RuntimeError("LLM 未提取出有效地点(district_id/district/city/province)")
# 日志记录 API 调用参数
logger.info("调用百度天气接口,params=%s", json.dumps(params, ensure_ascii=False))
# 使用 httpx 异步请求百度天气 API
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(WEATHER_V1_URL, params=params)
r.raise_for_status()
data = r.json()
# 判断百度天气接口返回状态,非 0 抛异常
if data.get("status") != 0:
raise RuntimeError(
f"百度天气接口失败: status={data.get('status')} message={data.get('message')}"
)
# 日志记录成功
logger.info("百度天气接口调用成功")
# 返回结果字段(天气数据)
return data.get("result") or {}
# 注册 MCP 工具方法,用于获取旅行天气预报
@mcp.tool()
async def get_travel_forecast(
destination: Annotated[
str,
Field(description="旅游目的地,例如:吉林省延边朝鲜族自治州龙井市"),
],
days: Annotated[
int,
Field(description="展示未来天气天数,范围 1~15,默认 3"),
] = 3,
):
"""天气预告服务器"""
# days 限制范围在 1~15 天
days = max(1, min(int(days), 15))
# 调用大模型参数抽取
params_from_llm = await _extract_params(destination)
# 调用百度天气获取结果
result = await _fetch_forecast(params_from_llm)
# 取得每日预报列表
forecasts = list(result.get("forecasts") or [])
# 日志记录生成天气预报
logger.info("生成天气预报结果,destination=%s days=%s", destination, days)
# 构建输出文本行
lines = [
f"输入目的地:{destination}",
f"参数:{json.dumps(params_from_llm, ensure_ascii=False)}",
f"展示天数:{min(days, len(forecasts))}",
"",
]
# 遍历每天的预报信息,累加到输出内容
for day in forecasts[:days]:
lines.append(f"{day.get('date', '')} {day.get('week', '')}")
lines.append(
f"白天:{day.get('text_day', '')},夜间:{day.get('text_night', '')}"
)
lines.append(f"温度:{day.get('low', '')}~{day.get('high', '')}℃")
lines.append("")
# 返回拼接后的天气预报文本
return "\n".join(lines)
# 主程序入口,启动 MCP 服务(stdio 模式)
def main():
mcp.run(transport="stdio")
# 如果当前模块为主程序,则执行 main()
if __name__ == "__main__":
main()
2.3 执行过程 #
sequenceDiagram
autonumber
participant Client as MCP客户端
participant Server as weather-server.py
participant DS as DeepSeek API
participant BD as 百度天气API
Client->>Server: 启动服务进程
Server->>Server: main() -> mcp.run(transport="stdio")
Client->>Server: 调用 get_travel_forecast(destination, days)
Server->>Server: days 约束到 [1,15]
Server->>Server: _extract_params(destination)
Server->>Server: _deepseek() 读取 DEEPSEEK_* 环境变量
Server->>DS: chain.ainvoke(prompt+format_instructions)
DS-->>Server: 返回 JSON 参数(district_id/province/city/district)
Server->>Server: _fetch_forecast(params_from_llm)
Server->>Server: _ak() 读取 BAIDU_MAP_AK
Server->>Server: 按优先级组装百度请求参数
Server->>BD: GET /weather/v1 (data_type=fc, output=json, ...)
BD-->>Server: 天气 JSON(status/result/forecasts)
alt status != 0 或 HTTP错误
Server-->>Client: 抛出 RuntimeError
else 成功
Server->>Server: 组装多行天气文本
Server-->>Client: 返回 forecast 文本
end
3.地点搜索 #
本节我们将扩展基于MCP协议的能力,介绍“地点搜索”服务的架构与使用方法。
通过MCP,地点搜索可以作为一个独立、解耦的微服务模块运行,分别通过HTTP(SSE流式)协议实现异步数据交互。
主流程通常包括以下环节:
- 客户端通过SSE方式连接本地place服务容器(即启动place MCP服务器),并传递查询关键词、分页等参数;
- 服务器端解析请求参数,读取百度地图和DeepSeek相关环境变量,实现灵活的地图/LLM能力组合;
- 服务器内部自动调用百度地图地点搜索API,将返回的JSON结构化解析,按需裁剪、重组为MCP标准数据格式;
- 结果通过SSE通道按块逐步推送到客户端,适合大规模结果集或低延迟场景;
- 客户端可实时获取、展示景点、地标等地点信息,也可结合更多模块进行多步联动(如路线规划、天气查询等)。
3.1. place-client.py #
place-client.py
# 调试用的描述字符串,说明本文件功能:通过SSE方式连接place MCP服务器并调用search_place工具
"""调试用:连接 place SSE MCP 并调用 search_place。"""
# 导入异步操作相关库
import asyncio
# 导入日志处理相关库
import logging
# 从mcp包中导入客户端会话类
from mcp import ClientSession
# 从mcp客户端SSE模块导入sse客户端工厂函数
from mcp.client.sse import sse_client
# 配置日志输出格式与等级为INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为"place-client"的日志记录器
logger = logging.getLogger("place-client")
# 定义SSE服务URL(本地服务器地址)
SSE_URL = "http://127.0.0.1:8001/sse"
# 设置百度地图AK密钥
BAIDU_MAP_AK = "e0QsxCTdlt6qPNoQQNJwa89qoJ4OXieX"
# 设置DeepSeek的API Key
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"
# 定义异步主操作函数
async def run():
# 日志提示正在连接SSE服务
logger.info("连接 SSE 服务:%s", SSE_URL)
# 构造请求头,带有百度和DeepSeek的API密钥
headers = {"BAIDU_MAP_AK": BAIDU_MAP_AK, "DEEPSEEK_API_KEY": DEEPSEEK_API_KEY}
# 以异步方式连接SSE客户端,获取读写对象
async with sse_client(SSE_URL, headers=headers) as (read, write):
# 以ClientSession包装读写对象,建立MCP协议会话
async with ClientSession(read, write) as session:
# 初始化MCP会话
await session.initialize()
# 日志记录会话初始化成功
logger.info("MCP 会话初始化成功")
# 日志提示即将查询景点
logger.info("调用工具 search_place(景点)")
# 向服务端调用search_place,参数为“北京市 景点”、分页等
scenic = await session.call_tool(
"search_place",
{"user_input": "北京市 景点", "page_size": 5, "page_num": 0, "with_detail": False},
)
# 若返回出错,抛出异常输出错误内容
if scenic.isError:
raise RuntimeError(str(scenic.content))
# 日志记录查询景点成功
logger.info("景点查询成功")
# 遍历返回的景点内容,逐条提取并输出结果内容
for item in scenic.content:
text = getattr(item, "text", "")
if text:
logger.info("景点结果:\n%s", text)
# 日志提示即将查询酒店
logger.info("调用工具 search_place(酒店)")
# 向服务端调用search_place,参数为“北京市 酒店”、分页等
hotel = await session.call_tool(
"search_place",
{"user_input": "北京市 酒店", "page_size": 5, "page_num": 0, "with_detail": True},
)
# 若返回出错,抛出异常输出错误内容
if hotel.isError:
raise RuntimeError(str(hotel.content))
# 日志记录查询酒店成功
logger.info("酒店查询成功")
# 遍历返回的酒店内容,逐条提取并输出结果内容
for item in hotel.content:
text = getattr(item, "text", "")
if text:
logger.info("酒店结果:\n%s", text)
# 定义主函数入口
def main():
# 启动异步主流程
asyncio.run(run())
# 如果本文件直接运行,则执行main入口函数
if __name__ == "__main__":
main()
3.2. place-server.py #
place-server.py
# 地点检索 MCP 服务器(SSE)
"""地点检索 MCP 服务器(SSE)。"""
# 导入标准库模块
import json # 导入json处理模块
import logging # 导入日志模块
import os # 导入操作系统相关模块
from typing import Annotated # 导入类型注解模块
# 导入第三方模块
import httpx # 导入http异步请求库
from langchain_core.output_parsers import JsonOutputParser # 导入LangChain的JSON输出解析器
from langchain_core.prompts import ChatPromptTemplate # 导入LangChain的Prompt模板
from langchain_deepseek import ChatDeepSeek # 导入DeepSeek聊天模块
from mcp.server.fastmcp import Context, FastMCP # 导入FastMCP框架相关类
from pydantic import Field # 导入Pydantic的Field用于参数描述
# 配置日志格式和日志级别为INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为"place-service"的日志记录器
logger = logging.getLogger("place-service")
# 设置主机地址
HOST = "127.0.0.1"
# 设置服务端口
PORT = 8001
# 设置SSE路径
SSE_PATH = "/sse"
# 初始化一个FastMCP实例,名称为"地点检索服务"
mcp = FastMCP("地点检索服务", host=HOST, port=PORT, sse_path=SSE_PATH)
# 定义百度地图检索相关API地址
PLACE_SEARCH_URL = "https://api.map.baidu.com/place/v2/search" # 地点检索接口
PLACE_DETAIL_URL = "https://api.map.baidu.com/place/v2/detail" # 地点详情接口
PLACE_SUGGESTION_URL = "https://api.map.baidu.com/place/v2/suggestion" # 地点建议接口
# 从请求上下文中获取指定请求头的值
def _header_value(ctx, key):
request = None # 初始化request为None
# 如果上下文和request_context都存在,则获取request对象
if ctx and ctx.request_context:
request = getattr(ctx.request_context, "request", None)
# 若找不到request则抛出异常
if request is None:
raise RuntimeError(f"缺少请求上下文,无法读取请求头 {key}")
# 请求头支持大小写
raw = request.headers.get(key) or request.headers.get(key.lower()) or ""
v = str(raw).strip() # 清理空格
# 若值为空则抛出异常
if not v:
raise RuntimeError(f"缺少请求头 {key}")
return v # 返回请求头的值
# 从上下文中获取百度AK
def _ak(ctx):
return _header_value(ctx, "BAIDU_MAP_AK")
# 构造DeepSeek聊天对象
def _deepseek(ctx):
# 获取DeepSeek API KEY
key = _header_value(ctx, "DEEPSEEK_API_KEY")
# 创建ChatDeepSeek对象,支持自定义模型和Base URL
return ChatDeepSeek(
api_key=key,
base_url=(os.environ.get("DEEPSEEK_BASE_URL") or "https://api.deepseek.com").strip(),
model=(os.environ.get("DEEPSEEK_MODEL") or "deepseek-chat").strip(),
temperature=0,
)
# 利用LLM自动抽取地点检索参数
async def _extract_place_args(user_input, ctx):
# 记录日志,标记开始参数抽取
logger.info("开始抽取地点检索参数,input=%s", user_input)
# 创建JSON格式解析器
parser = JsonOutputParser()
# 创建用于抽取的Prompt模板
prompt = ChatPromptTemplate.from_template(
"""
从用户输入中抽取百度地点检索参数,只输出 JSON:
{{
"region": "",
"query": "",
"tag": "",
"city_limit": false
}}
规则:
1) region 必须是地区名(市/州/区县),如“延边朝鲜族自治州”。
2) query 填检索词,如“景点”“酒店”“火锅”等。
3) tag 可选(如“旅游景点”“酒店”),无法确定可空字符串。
4) city_limit 仅当用户强调“仅本地区”时设为 true。
输入:{user_input}
输出格式要求:{format_instructions}
""".strip()
)
# 组合Prompt、LLM和解析器,构造Chain链式调用
chain = prompt | _deepseek(ctx) | parser
# 异步执行链式调用,获得LLM抽取的参数
data = await chain.ainvoke(
{"user_input": user_input, "format_instructions": parser.get_format_instructions()}
)
# 整理参数,去除两端空白,保证类型安全
extracted = {
"region": str(data.get("region") or "").strip(),
"query": str(data.get("query") or "").strip(),
"tag": str(data.get("tag") or "").strip(),
"city_limit": bool(data.get("city_limit") or False),
}
# 打印参数抽取结果日志
logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
return extracted # 返回抽取到的参数字典
# 根据参数请求百度地点检索服务
async def _search_places(params_from_llm, page_size, page_num, ctx):
# 组装百度API请求参数
req = {
"query": params_from_llm["query"] or "景点", # 未识别query默认为景点
"region": params_from_llm["region"],
"tag": params_from_llm["tag"],
"city_limit": "true" if params_from_llm["city_limit"] else "false",
"output": "json",
"scope": "2",
"page_size": str(page_size),
"page_num": str(page_num),
"ak": _ak(ctx),
}
# 若缺少region,无法查询,报错
if not req["region"]:
raise RuntimeError("LLM 未提取出 region,无法检索地点")
# 打印请求参数日志
logger.info("调用地点检索接口,params=%s", json.dumps(req, ensure_ascii=False))
# 使用httpx发送异步GET请求
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(PLACE_SEARCH_URL, params=req)
r.raise_for_status()
data = r.json()
# 若接口返回状态非0,则抛出异常
if data.get("status") != 0:
raise RuntimeError(f"地点检索失败: status={data.get('status')} message={data.get('message')}")
return data # 返回百度API响应结果
# 通过UID获取百度地点详情
async def _detail_by_uid(uid, ctx):
# 组装请求参数
req = {"uid": uid, "scope": "2", "output": "json", "ak": _ak(ctx)}
# 使用httpx发送异步请求
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(PLACE_DETAIL_URL, params=req)
r.raise_for_status()
data = r.json()
# 如果状态不等于0,返回空字典
if data.get("status") != 0:
return {}
# 返回地点详细信息的result部分
return data.get("result") or {}
# 获得关键词/区域自动补全建议
async def _suggest_region_keyword(region, query, ctx):
# 构造建议接口请求参数
req = {
"query": query or region,
"region": region or "全国",
"city_limit": "false",
"output": "json",
"ak": _ak(ctx),
}
# 使用httpx异步请求suggestion接口
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(PLACE_SUGGESTION_URL, params=req)
r.raise_for_status()
data = r.json()
# 状态失败返回空列表
if data.get("status") != 0:
return []
# 返回建议列表
return list(data.get("results") or [])
# 定义MCP服务的search_place工具
@mcp.tool()
async def search_place(
user_input: Annotated[
str,
Field(description="用户输入的地点检索需求,例如:北京 景点"),
],
page_size: Annotated[
int,
Field(description="每页数量,范围 1~20,默认 5"),
] = 5,
page_num: Annotated[
int,
Field(description="页码,从 0 开始,默认 0"),
] = 0,
with_detail: Annotated[
bool,
Field(description="是否补充地点详情信息(电话、人均、营业时间)"),
] = False,
ctx: Context = None,
):
# 工具函数说明
"""地点检索 MCP 服务。"""
# 限定page_size取值范围在1~20之间
page_size = max(1, min(int(page_size), 20))
# 限定page_num为非负整数
page_num = max(0, int(page_num))
# 将with_detail类型转换为bool
with_detail = bool(with_detail)
# 第一步:通过LLM抽取检索参数
params_from_llm = await _extract_place_args(user_input, ctx)
# 第二步:通过API获取检索结果
data = await _search_places(params_from_llm, page_size, page_num, ctx)
# 第三步:得到检索结果与总数
results = list(data.get("results") or [])
total = int(data.get("total") or 0)
# 第四步:构造返回文本的前缀内容
lines = [
f"输入:{user_input}",
f"参数:{json.dumps(params_from_llm, ensure_ascii=False)}",
f"总数:{total},当前页数量:{len(results)}",
"",
]
# 如果无结果,获取推荐候选词
if not results:
suggestions = await _suggest_region_keyword(params_from_llm["region"], params_from_llm["query"], ctx)
lines.append("未检索到结果。可参考候选词:")
# 枚举前5个候选词输出
for i, it in enumerate(suggestions[:5], 1):
lines.append(f"{i}. {it.get('name', '')} {it.get('city', '')}{it.get('district', '')}")
return "\n".join(lines) # 返回结果字符串
# 对返回的每条结果进行格式化输出
for i, item in enumerate(results, 1):
lines.append(f"{i}. {item.get('name', '')}")
lines.append(f" 地址:{item.get('address', '未知')}")
lines.append(f" 区域:{item.get('province', '')}{item.get('city', '')}{item.get('area', '')}")
lines.append(f" 评分:{item.get('detail_info', {}).get('overall_rating', '无')}")
lines.append(f" 类型:{item.get('detail_info', {}).get('tag', '无')}")
# 如需补充详细信息
if with_detail:
uid = str(item.get("uid") or "").strip()
# 存在UID才查详情
if uid:
detail = await _detail_by_uid(uid, ctx)
d = detail.get("detail_info") or {}
lines.append(f" 电话:{detail.get('telephone', '无')}")
lines.append(f" 人均:{d.get('price', '无')}")
lines.append(f" 营业时间:{d.get('shop_hours', '无')}")
lines.append("")
# 合并全部文本并返回
return "\n".join(lines)
# 主程序入口
def main():
# 打印服务启动日志
logger.info("启动 Place MCP SSE 服务: http://%s:%s%s", HOST, PORT, SSE_PATH)
# 启动MCP服务,使用SSE协议
mcp.run(transport="sse")
# 如果是主程序入口,则执行main函数
if __name__ == "__main__":
main()
3.3 执行过程 #
sequenceDiagram
autonumber
participant Client as MCP客户端
participant Server as place-server.py
participant DS as DeepSeek API
participant BS as 百度 Place Search
participant BD as 百度 Place Detail
participant BG as 百度 Suggestion
Client->>Server: 启动服务进程
Server->>Server: main() -> mcp.run(transport="sse")
Client->>Server: 调用 search_place(user_input, page_size, page_num, with_detail, headers)
Server->>Server: 参数裁剪与类型规范化
Server->>Server: 从 ctx.headers 读取 DEEPSEEK_API_KEY / BAIDU_MAP_AK
Server->>DS: _extract_place_args(user_input)
DS-->>Server: {region, query, tag, city_limit}
Server->>BS: _search_places(params, page_size, page_num)
BS-->>Server: results, total, status
alt results 为空
Server->>BG: _suggest_region_keyword(region, query)
BG-->>Server: suggestions[]
Server-->>Client: 返回“无结果 + 候选词”文本
else results 非空
loop 每个 result
alt with_detail == true 且存在 uid
Server->>BD: _detail_by_uid(uid)
BD-->>Server: detail_info
end
end
Server-->>Client: 返回格式化地点列表文本
end
4. 路径规划 #
在本部分,我们将介绍路径规划的整体架构和工作流程。路径规划模块负责根据用户输入的起点、终点(可选途经点)和交通方式(如驾车、步行、骑行、公交等),请求第三方地图 API 获取推荐路线,并将结果结构化后返回。
整体流程如下:
- MCP 服务器通过对应的 API(如
plan_route)接收来自客户端的路径规划请求。 - 服务器会从请求体或 headers 中提取必要参数(如起点、终点、交通工具类型、API Key 等),并进行参数校验。
- 根据请求的交通方式和参数,服务器调用百度地图/高德等第三方的路径规划 API,并对返回结果进行容错、重试和数据解析。
- 若参数有误或第三方接口返回失败,服务器会格式化错误提示返回给客户端。
- 若成功获取路径结果,服务器会将路线(包括距离、预计耗时、路线步骤等)结构化封装后返回给客户端。
- 客户端收到格式化结果后可进行展示或进一步操作。
4.1. direction-client.py #
direction-client.py
# 调试用描述:连接 direction Streamable HTTP MCP 并调用 plan_route。
"""调试用:连接 direction Streamable HTTP MCP 并调用 plan_route。"""
# 导入异步相关库
import asyncio
# 导入日志模块
import logging
# 导入 httpx 库用于 HTTP 通信
import httpx
# 从 mcp 包中导入客户端会话类
from mcp import ClientSession
# 从 mcp.client.streamable_http 模块导入客户端工厂
from mcp.client.streamable_http import streamable_http_client
# 配置日志的输出格式和等级为 INFO
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取名为 "direction-client" 的日志记录器
logger = logging.getLogger("direction-client")
# 定义 MCP 服务的 HTTP URL
HTTP_URL = "http://127.0.0.1:8002/mcp"
# 设置百度地图 AK 密钥
BAIDU_MAP_AK = "e0QsxCTdlt6qPNoQQNJwa89qoJ4OXieX"
# 设置 DeepSeek API Key
DEEPSEEK_API_KEY = "sk-24088156e9ab48f3adddaf5a9c0c4ede"
# 定义主异步运行函数
async def run():
# 日志输出:正在连接 Streamable HTTP 服务
logger.info("连接 Streamable HTTP 服务:%s", HTTP_URL)
# 构造请求头,携带百度地图和 DeepSeek 的 API 密钥
headers = {"BAIDU_MAP_AK": BAIDU_MAP_AK, "DEEPSEEK_API_KEY": DEEPSEEK_API_KEY}
# 创建 httpx 的异步客户端,请求超时时间为 30 秒
async with httpx.AsyncClient(headers=headers, timeout=30.0) as http_client:
# 使用工厂方法连接至 Streamable HTTP MCP,并获取读写对象
async with streamable_http_client(HTTP_URL, http_client=http_client) as (read, write, _):
# 创建 MCP 会话,包装读写对象
async with ClientSession(read, write) as session:
# 初始化 MCP 会话
await session.initialize()
# 日志输出会话初始化成功
logger.info("MCP 会话初始化成功")
# 日志输出:调用 plan_route(驾车)
logger.info("调用 plan_route(驾车)")
# 调用工具 plan_route,查询从北京市海淀区到天津市滨海新区的驾车路线,尽量不走高速
driving = await session.call_tool(
"plan_route",
{"user_input": "从北京市海淀区到天津市滨海新区驾车,尽量不走高速", "max_steps": 6},
)
# 遍历驾车路线的结果内容
for item in driving.content:
# 获取结果的 text 字段
text = getattr(item, "text", "")
# 若存在文本内容,日志输出驾车结果
if text:
logger.info("驾车结果:\n%s", text)
# 日志输出:调用 plan_route(公共交通)
logger.info("调用 plan_route(公共交通)")
# 调用工具 plan_route,查询从北京市朝阳区到上海市浦东新区的公共交通路线,优先选择火车
transit = await session.call_tool(
"plan_route",
{"user_input": "从北京市朝阳区到上海市浦东新区公共交通,优先火车", "max_steps": 6},
)
# 遍历公共交通路线的结果内容
for item in transit.content:
# 获取结果的 text 字段
text = getattr(item, "text", "")
# 若存在文本内容,日志输出公共交通结果
if text:
logger.info("公共交通结果:\n%s", text)
# 定义主入口函数
def main():
# 运行主异步函数
asyncio.run(run())
# 判断当前模块是否为主程序入口
if __name__ == "__main__":
# 如果是主程序则执行 main 函数
main()
4.2. direction-server.py #
direction-server.py
# 路线规划 MCP 服务器(Streamable HTTP)。
"""路线规划 MCP 服务器(Streamable HTTP)。"""
# 导入标准库
import json # 导入 JSON 模块,用于序列化和反序列化
import logging # 导入日志模块,用于日志输出
from typing import Annotated # 导入类型注解
# 导入第三方库和自定义依赖
import httpx # 导入 httpx,用于异步 HTTP 请求
from langchain_core.output_parsers import JsonOutputParser # 导入 LangChain 的 JSON 输出解析器
from langchain_core.prompts import ChatPromptTemplate # 导入 LangChain 的聊天提示模板
from langchain_deepseek import ChatDeepSeek # 导入 DeepSeek 聊天模型
from mcp.server.fastmcp import Context, FastMCP # 导入自定义的 MCP 相关依赖
from pydantic import Field # 导入 Pydantic 的 Field 用于模型字段描述
# 配置日志格式和等级
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
# 获取 direction-service 的日志记录器
logger = logging.getLogger("direction-service")
# 定义主机地址和服务端口
HOST = "127.0.0.1"
PORT = 8002
# 定义 HTTP 路径
HTTP_PATH = "/mcp"
# 实例化 MCP 服务
mcp = FastMCP("路线规划服务", host=HOST, port=PORT, streamable_http_path=HTTP_PATH)
# 百度地理编码 API 地址
GEOCODE_URL = "https://api.map.baidu.com/geocoding/v3/"
# 百度路线规划 API 基础地址
ROUTE_V2_BASE = "https://api.map.baidu.com/direction/v2"
# DeepSeek 相关常量
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
DEEPSEEK_MODEL = "deepseek-chat"
# 从请求上下文中读取请求头
def _header_value(ctx, key):
request = None # 初始化请求对象为 None
# 获取请求对象
if ctx and ctx.request_context:
request = getattr(ctx.request_context, "request", None)
# 如果请求不存在则报错
if request is None:
raise RuntimeError(f"缺少请求上下文,无法读取请求头 {key}")
# 获取请求头内容,支持大小写
raw = request.headers.get(key) or request.headers.get(key.lower()) or ""
v = str(raw).strip() # 去除空格转换为字符串
# 若头部内容为空则报错
if not v:
raise RuntimeError(f"缺少请求头 {key}")
return v # 返回头部的值
# 获取百度地图 AK
def _ak(ctx):
return _header_value(ctx, "BAIDU_MAP_AK") # 调用 _header_value 获取百度地图 AK
# 构建 DeepSeek 对象
def _deepseek(ctx):
key = _header_value(ctx, "DEEPSEEK_API_KEY") # 获取 DeepSeek 的 API KEY
return ChatDeepSeek(
api_key=key,
base_url=DEEPSEEK_BASE_URL,
model=DEEPSEEK_MODEL,
temperature=0,
) # 返回 DeepSeek 聊天对象
# 利用 LLM 从用户输入中抽取路线参数
async def _extract_route_args(user_input, ctx):
# 日志:开始抽取参数
logger.info("开始抽取路线参数,input=%s", user_input)
# 实例化 JSON 输出解析器
parser = JsonOutputParser()
# 构建 LLM 提示模板
prompt = ChatPromptTemplate.from_template(
"""
从输入中抽取百度路线规划参数,只输出 JSON:
{{
"origin": "",
"destination": "",
"mode": "driving",
"tactics": ""
}}
约束:
1) mode 只能是 driving / transit。
2) 如果用户提到“公共交通/地铁/公交/高铁/动车/火车/飞机”,mode 设为 transit。
3) tactics 可选(仅 driving 生效,如 “avoid_highway”),否则空字符串。
输入:{user_input}
输出格式要求:{format_instructions}
""".strip()
)
# 串联提示、模型、JSON 解析器
chain = prompt | _deepseek(ctx) | parser
# 异步推理
data = await chain.ainvoke(
{"user_input": user_input, "format_instructions": parser.get_format_instructions()}
)
# 国际化 mode 字段,保证仅支持 driving/transit
mode = str(data.get("mode") or "driving").strip().lower()
if mode not in {"driving", "transit"}:
mode = "driving"
# 组织抽取结果为字典
extracted = {
"origin": str(data.get("origin") or "").strip(),
"destination": str(data.get("destination") or "").strip(),
"mode": mode,
"tactics": str(data.get("tactics") or "").strip(),
}
# 输出参数抽取日志
logger.info("参数抽取完成:%s", json.dumps(extracted, ensure_ascii=False))
return extracted # 返回参数抽取结果
# 地址地理编码(调用百度 API,返回经纬度及格式化地址)
async def _geocode(address, ctx):
req = {"address": address, "output": "json", "ak": _ak(ctx)} # 构造请求参数
# 新建 AsyncClient 请求百度地理编码接口
async with httpx.AsyncClient(timeout=20.0) as client:
r = await client.get(GEOCODE_URL, params=req) # 发送 GET 请求
r.raise_for_status() # 检查响应码
data = r.json() # 解析响应 JSON
# 若API状态非0抛异常
if data.get("status") != 0:
raise RuntimeError(f"地理编码失败: {address}, status={data.get('status')}")
# 获取地理编码结果
result = data.get("result") or {}
loc = result.get("location") or {}
lng, lat = loc.get("lng"), loc.get("lat") # 提取经纬度
# 若坐标不存在抛异常
if lng is None or lat is None:
raise RuntimeError(f"地理编码失败: {address}, 无坐标")
# 返回“纬度,经度”和格式化地址
return f"{lat},{lng}", result.get("formatted_address") or address
# 秒转化为“xx小时xx分钟”或“xx分钟”
def _seconds_to_hhmm(sec):
sec = int(sec or 0) # 转换为整数,若为 None 置为 0
h = sec // 3600 # 小时数
m = (sec % 3600) // 60 # 分钟数
if h > 0:
return f"{h}小时{m}分钟" # 返回“xh ym”
return f"{m}分钟" # 返回“ym”
# 解析单步路线描述
def _step_text(step):
# 依次获取多种描述字段
text = str(
step.get("instruction")
or step.get("instructions")
or step.get("html_instructions")
or ""
).strip()
# 若存在纯文本描述,移除<b>标签后直接返回
if text:
return text.replace("<b>", "").replace("</b>", "")
# 否则拼接路名、距离和时长
road = str(step.get("road_name") or "道路").strip()
dist = int(step.get("distance") or 0)
dur = int(step.get("duration") or 0)
if dist > 0:
return f"沿{road}行驶约{dist}米,预计{_seconds_to_hhmm(dur)}"
return f"沿{road}行驶"
# 查询路线(调用百度路线 API)
async def _query_route(route_args, ctx):
# 获取起点和终点的坐标及格式化地址
origin_ll, origin_fmt = await _geocode(route_args["origin"], ctx)
dest_ll, dest_fmt = await _geocode(route_args["destination"], ctx)
# 获取出行方式
mode = route_args["mode"]
# 构造 API 端点
endpoint = f"{ROUTE_V2_BASE}/{mode}"
# 组装接口请求参数
req = {
"origin": origin_ll,
"destination": dest_ll,
"ak": _ak(ctx),
}
# 如为驾车则加入策略参数
if mode == "driving":
req["tactics"] = "11" if route_args["tactics"] == "avoid_highway" else "0"
# 日志:调用路线接口
logger.info("调用路线接口,mode=%s params=%s", mode, json.dumps(req, ensure_ascii=False))
# 新建 AsyncClient 请求路线接口
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(endpoint, params=req) # 向百度接口发送请求
r.raise_for_status() # 检查 HTTP 状态码
data = r.json() # 解析 JSON 响应
# 若API返回失败则抛异常
if data.get("status") != 0:
raise RuntimeError(f"路线规划失败: status={data.get('status')} message={data.get('message')}")
# 获取路线结果
result = data.get("result") or {}
routes = list(result.get("routes") or [])
# 若无可用路线则抛异常
if not routes:
raise RuntimeError("路线规划失败: 无可用路线")
# 选择第一条最佳路线
best = routes[0]
# 返回所需结构体
return {
"origin": origin_fmt,
"destination": dest_fmt,
"mode": mode,
"distance": int(best.get("distance") or 0),
"duration": int(best.get("duration") or 0),
"steps": best.get("steps") or [],
}
# MCP 工具:主路线规划接口
@mcp.tool()
async def plan_route(
user_input: Annotated[str, Field(description="用户输入的路线规划需求,例如:从北京市海淀区到天津市滨海新区驾车,尽量不走高速")],
max_steps: Annotated[int, Field(description="最大步数,范围 1~20,默认 6")] = 6,
ctx: Annotated[Context, Field(description="上下文")] = None,
):
# 路线规划服务说明
"""路线规划服务。"""
# 抽取参数
route_args = await _extract_route_args(user_input, ctx)
# 查询路线
data = await _query_route(route_args, ctx)
# 步数限幅在1~20
max_steps = max(1, min(int(max_steps), 20))
# 组织输出文本(前缀:输入、参数、出发地、目的地、方式、里程、时长)
lines = [
f"输入:{user_input}",
f"参数:{json.dumps(route_args, ensure_ascii=False)}",
f"出发地:{data['origin']}",
f"目的地:{data['destination']}",
f"方式:{data['mode']}",
f"总里程:{round(data['distance'] / 1000, 2)} 公里",
f"预计时长:{_seconds_to_hhmm(data['duration'])}",
"",
"导航摘要:",
]
# 计数器
shown = 0
# 遍历每一步提示
for step in data["steps"]:
if shown >= max_steps:
break
# 若为多子步骤则递归处理
if isinstance(step, list):
for s in step:
if shown >= max_steps:
break
lines.append(f"- {_step_text(s)}")
shown += 1
else:
lines.append(f"- {_step_text(step)}")
shown += 1
# 返回合并行(文本)
return "\n".join(lines)
# 主函数
def main():
# 日志:启动服务
logger.info("启动 Direction MCP Streamable HTTP 服务: http://%s:%s%s", HOST, PORT, HTTP_PATH)
# 运行 MCP 服务
mcp.run(transport="streamable-http")
# 程序入口
if __name__ == "__main__":
main()
4.3 执行过程 #
sequenceDiagram
autonumber
participant Client as MCP客户端
participant Server as direction-server.py
participant DS as DeepSeek API
participant Geo as 百度地理编码
participant Route as 百度路线规划
Client->>Server: 启动服务
Server->>Server: main() -> mcp.run(streamable-http)
Client->>Server: plan_route(user_input, max_steps, headers)
Server->>Server: 读取请求头(DEEPSEEK_API_KEY, BAIDU_MAP_AK)
Server->>DS: _extract_route_args(user_input)
DS-->>Server: {origin, destination, mode, tactics}
Server->>Server: 规范 mode ∈ {driving, transit}
Server->>Geo: _geocode(origin)
Geo-->>Server: origin(lat,lng), formatted_address
Server->>Geo: _geocode(destination)
Geo-->>Server: destination(lat,lng), formatted_address
Server->>Route: _query_route(mode, origin_ll, dest_ll, tactics, ak)
Route-->>Server: routes[]
alt status!=0 或 routes为空
Server-->>Client: RuntimeError
else 成功
Server->>Server: 截取 max_steps 并格式化导航摘要
Server-->>Client: 返回文本路线结果
end