- 1. 功能介绍
- 2. 服务器参数类型
- 3. 客户端
- 4. 初始化
- 4.1. session.py
- 4.2. fastmcp.py
- 4.3. stdio.py
- 4.4. 1.mcp.py
- 4.5. init.py
- 4.6. stdio.py
- 4.7. types.py
- 4.8 工作流程
- 4.8.1 整体流程
- 4.8.2 session.py客户端会话
- 4.8.3 _request:发请求并等待响应
- 4.8.4 _notify:发通知(无响应)
- 4.8.5 initialize:初始化握手
- 4.8.6 fastmcp.py服务端
- 4.8.7 stdio.py(服务端)单进程 stdio 循环
- 4.8.8 1.mcp.py主入口
- 4.8.9 stdio.py(客户端)关键修复
- 4.8.10 types.py 类型与适配器
- 4.8.11 MCP 模型基类
- 4.8.12 初始化相关类型
- 4.8.13 JSON-RPC 类型
- 4.8.14 TypeAdapter
- 4.8.15 消息流示意
- 4.8.16 小结
- 4.8.17 请求响应对应
1. 功能介绍 #
这是一个 MCP(Model Context Protocol)示例,演示如何在同一进程中:
- 启动一个 MCP 服务器(子进程)
- 作为客户端连接该服务器
- 初始化、列出工具、调用工具
通过命令行参数在「服务器模式」和「客户端模式」之间切换。
2. 服务器参数类型 #
StdioServerParameters 是 MCP Lite 客户端用于描述如何启动服务器子进程的参数类型。本质上,它封装了利用 stdio(标准输入输出)与模型进程进行通信时的子进程启动方式,包括:
- command:要启动的可执行文件(比如 "python")。
- args:传递给命令的参数列表(如当前脚本文件名和默认参数 "serve")。
使用 StdioServerParameters,可以标准化和简化服务器自动拉起流程,例如:
# 构造参数并启动子进程
params = StdioServerParameters(command="python", args=["my_model_server.py", "serve"])如此,客户端只需描述如何启动服务器,并不直接关心底层进程/管道管理的细节。这种方式便于自测、本地调试、支持多种模型后端,极大提升开发效率。
npx @modelcontextprotocol/inspector uv --directory D:/forever/docs/mcpsdk2 run 1.mcp.py serve2.1. 1.mcp.py #
1.mcp.py
# 导入 sys 模块
import sys
from mcp_lite import StdioServerParameters
# 定义客户端启动函数
def run_client():
# 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
print(server_params)
# 主入口函数,根据命令行参数决定运行模式
def main():
# 如果参数含有"serve",以服务器模式运行
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
pass
else:
# 否则启动客户端测试流程,自动拉起服务端
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
run_client()
# 判断是否直接运行此文件
if __name__ == "__main__":
main()
官方代码
# 导入 sys 模块
import sys
from mcp import StdioServerParameters
# 定义客户端启动函数
def run_client():
# 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
print(server_params)
# 主入口函数,根据命令行参数决定运行模式
def main():
# 如果参数含有"serve",以服务器模式运行
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
pass
else:
# 否则启动客户端测试流程,自动拉起服务端
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
run_client()
# 判断是否直接运行此文件
if __name__ == "__main__":
main()2.2. init.py #
mcp_lite/init.py
from mcp_lite.client.stdio import StdioServerParameters
# 设置当前模块的对外可见成员列表
__all__ = ["StdioServerParameters"]
2.3. init.py #
mcp_lite/client/init.py
2.4. stdio.py #
mcp_lite/client/stdio.py
# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
# 要执行的命令(字符串类型)
command: str
# 命令行参数,默认为空列表
args: list = Field(default_factory=list)
# 额外的环境变量,可以为None
env: dict | None = None
# 工作目录,可以为str、Path或None
cwd: str | Path | None = None
# 字符编码,默认为utf-8
encoding: str = "utf-8"3. 客户端 #
3.1. 1.mcp.py #
1.mcp.py
# 导入 sys 模块
import sys
from mcp_lite import StdioServerParameters
+from mcp_lite.client.stdio import stdio_client
# 定义客户端启动函数
def run_client():
# 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
# 使用上下文管理器启动 stdio_client,返回读写流
+ with stdio_client(server_params) as (read, write):
+ print(read, write)
# 主入口函数,根据命令行参数决定运行模式
def main():
# 如果参数含有"serve",以服务器模式运行
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
pass
else:
# 否则启动客户端测试流程,自动拉起服务端
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
run_client()
# 判断是否直接运行此文件
if __name__ == "__main__":
main()stdio_client 来自官方 MCP Python SDK(mcp.client.stdio),它是一个异步上下文管理器,返回的是 _AsyncGeneratorContextManager,只能配合 async with 使用。
而 stdio_client 只实现了 __aenter__ / __aexit__,没有实现 __enter__ / __exit__,所以不能用在普通的 with 里,会报:
TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol
官方代码
# 导入异步相关模块
import asyncio
# 导入系统模块,处理命令行参数等
import sys
# 从 mcp 包导入 StdioServerParameters,构建进程参数
from mcp import StdioServerParameters
# 从 mcp.client.stdio 导入 stdio_client,用于启动子进程并异步通信
from mcp.client.stdio import stdio_client
# 定义异步的客户端运行函数
async def run_client():
# 创建 StdioServerParameters,command 用 python 执行当前文件并传入 'serve'
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
# 通过 async with 管理 stdio_client 的异步上下文,获得读写流
async with stdio_client(server_params) as (read, write): # ✅ 使用 async with
# 打印读写流对象,简单测试
print(read, write)
# 定义主入口函数
def main():
# 如果命令行参数包含 'serve',说明以服务器模式运行,此处暂未实现
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
pass
else:
# 否则以客户端模式运行,打印提示
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
# 使用 asyncio.run 运行异步的 run_client 函数
asyncio.run(run_client()) # 用 asyncio.run 运行异步函数
# 判断是否直接以主程序运行本文件
if __name__ == "__main__":
# 入口调用 main()
main()3.2. message.py #
mcp_lite/message.py
# 定义一个名为 SessionMessage 的类
class SessionMessage:
# 定义初始化方法,接收一个 message 参数
def __init__(self, message):
# 将传入的 message 参数赋给实例属性 self.message
self.message = message
3.3. types.py #
mcp_lite/types.py
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
# 导入 Any 和 Literal 类型注解
from typing import Any, Literal
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 请求 ID,可以为 int 或 str 类型
id: RequestId = None
# 方法名称,字符串类型
method: str = ""
# 方法参数,为一个字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 通知的方法名称
method: str = ""
# 通知参数,可以为字典或者 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 响应的 ID,需要与请求 ID 匹配
id: RequestId = None
# 响应结果,可以为字典或 None
result: dict[str, Any] = None
# 定义错误的数据结构
class ErrorData(BaseModel):
# 错误码,默认值为 0
code: int = 0
# 错误信息,默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或 None
data: Any = None
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 错误对应的请求 ID,可以为 None
id: RequestId | None = None
# 错误的详细信息,类型为 ErrorData
error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)3.4. stdio.py #
mcp_lite/client/stdio.py
# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 导入subprocess模块,用于创建和管理子进程
+import subprocess
# 导入os模块,用于与操作系统交互
+import os
# 导入sys模块,访问与Python解释器相关的变量和函数
+import sys
# 导入threading模块,支持多线程
+import threading
# 从contextlib中导入contextmanager装饰器,用于实现上下文管理器
+from contextlib import contextmanager
# 从pathlib模块导入Path类,用于路径操作
+from pathlib import Path
# 从queue模块导入Queue类,实现线程安全的队列
+from queue import Queue
# 从mcp_lite.message模块导入SessionMessage类
+from mcp_lite.message import SessionMessage
# 从mcp_lite.types模块导入jsonrpc_message_adapter
+from mcp_lite.types import jsonrpc_message_adapter
# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
# 要执行的命令(字符串类型)
command: str
# 命令行参数,默认为空列表
args: list = Field(default_factory=list)
# 额外的环境变量,可以为None
env: dict | None = None
# 工作目录,可以为str、Path或None
cwd: str | Path | None = None
# 字符编码,默认为utf-8
encoding: str = "utf-8"
# 定义环境变量名列表,根据平台不同选择不同的变量集合
+_ENV_VARS = (
# 如果是在Windows平台,使用以下环境变量名
+ ["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
+ "PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
# 否则(非Windows),使用以下环境变量名
+ if sys.platform == "win32"
+ else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
+)
# 获取环境变量,并排除值以"()"开头的变量
+def _env():
# 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
+ return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}
# 定义stdio_client上下文管理器,实现通过stdio与子进程通信
+@contextmanager
+def stdio_client(server, errlog=sys.stderr):
# 创建线程安全的读取队列
+ read_q = Queue()
# 创建线程安全的写入队列
+ write_q = Queue()
# 如果在Windows上且server.command为"python",则用当前Python解释器,否则用指定命令
+ cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
# 合并环境变量,自定义env覆盖默认环境;强制子进程使用 UTF-8 避免 Windows 下中文解码错误
+ env = {**_env(), "PYTHONIOENCODING": "utf-8", **(server.env or {})}
# 启动子进程,并重定向其stdin、stdout和stderr
+ proc = subprocess.Popen(
+ [cmd] + server.args,#cmd是python解释器,server.args是当前脚本文件名和默认参数 "serve"
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,#stdin是标准输入,stdout是标准输出,stderr是标准错误
+ env=env, cwd=server.cwd, text=False,#env是环境变量,cwd是工作目录,text=False是二进制模式
+ )#启动子进程,并重定向其stdin、stdout和stderr
# 定义从子进程stdout读取数据的线程函数
+ def read_stdout():
+ try:
+ while True:
# 持续逐行读取子进程的标准输出
+ line = proc.stdout.readline()
# 如果读到空行说明输出结束,退出循环
+ if not line:
+ break
# 按指定编码解码,并去除首尾空白字符
+ line = line.decode(server.encoding).strip()
# 只处理非空行
+ if line:
+ try:
# 将JSON RPC消息解析为SessionMessage对象并放入读取队列
+ read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
+ except Exception:
# 解析异常时忽略该消息
+ pass
+ finally:
# 读取结束在队列插入None,通知主线程
+ read_q.put(None)
# 定义向子进程stdin写入数据的线程函数
+ def write_stdin():
+ try:
# 不断尝试取出待写入的消息(阻塞)
+ while (m := write_q.get()) is not None:
# 将消息对象序列化为json字符串,追加换行并编码,写入stdin
+ s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
+ proc.stdin.write(s)
+ proc.stdin.flush()
+ except (BrokenPipeError, ConnectionResetError):
# 若出现BrokenPipe或ConnectionReset,直接退出写线程
+ pass
# 创建并启动读取线程,设置为守护线程
+ rt = threading.Thread(target=read_stdout, daemon=True)
# 创建并启动写入线程,设置为守护线程
+ wt = threading.Thread(target=write_stdin, daemon=True)
+ rt.start()
+ wt.start()
# 定义读取流的类
+ class ReadStream:
# 从读取队列获取数据(阻塞),返回一条消息
+ def get(self):
+ return read_q.get()
# 定义写入流的类
+ class WriteStream:
# 将消息放入写入队列,等待写线程处理
+ def send(self, msg):
+ write_q.put(msg)
+ try:
# 让调用者获得读取流和写入流对象
+ yield ReadStream(), WriteStream()
+ finally:
# 1. 通知写入线程停止
+ write_q.put(None)
# 2. 等待写线程结束(给它时间处理完队列里的消息并退出)
+ wt.join(timeout=3)
+ try:
# 关闭子进程的stdin管道
+ proc.stdin.close()
+ except Exception:
# 如果关闭时出现异常则跳过
+ pass
# 等待子进程退出,最多2秒
+ proc.wait(timeout=2)
# 如果进程还未退出,强制杀死进程并等待其退出
+ if proc.poll() is None:
+ proc.kill()
+ proc.wait()
3.4 工作流程 #
stdio_client实现了一个 通过标准输入/输出(stdio)与 MCP 子进程通信 的客户端。它启动一个子进程,用 stdin/stdout 收发 JSON-RPC 消息,并通过两个线程分别负责读和写。
3.4.1. 数据模型:StdioServerParameters #
# 定义 StdioServerParameters 类,继承自 BaseModel
class StdioServerParameters(BaseModel):
# 指定要执行的命令(字符串类型)
command: str
# 指定命令行参数,默认为空列表
args: list = Field(default_factory=list)
# 指定额外的环境变量,可以为 None,类型为字典或 None
env: dict | None = None
# 指定工作目录,可以为 str、Path 或 None
cwd: str | Path | None = None
# 指定编码方式,默认为 utf-8
encoding: str = "utf-8"用 Pydantic 定义启动子进程所需的参数:
command:要执行的命令(如"python")args:命令行参数(如["1.mcp.py", "serve"])env:额外环境变量,会与默认环境合并cwd:工作目录encoding:stdin/stdout 的编码,默认utf-8
3.4.2. 环境变量:_env() 和 _ENV_VARS #
# 根据平台(Windows或非Windows)定义要继承的环境变量名字列表
_ENV_VARS = (
# 如果是Windows系统,则使用以下环境变量名
["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
"PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
# 否则(非Windows,如Linux/Mac),使用下面的环境变量名
if sys.platform == "win32"
else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)
# 定义 _env() 函数,返回过滤后的环境变量字典
def _env():
# 遍历_ENV_VARS,只收集值非空且不以"()"开头的变量
return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}_ENV_VARS:按平台选择要继承的环境变量名_env():从当前进程环境里取出这些变量,过滤掉值为空或"()"开头的,返回一个字典- 通常,以
"()"开头的环境变量值来源于 Shell 函数定义。
这样子进程会继承精简后的环境,避免把当前进程的过多环境带进去。
3.4.3 核心:stdio_client 上下文管理器 #
# 装饰器,声明这是一个上下文管理器
@contextmanager
# 定义stdio_client函数,接收服务参数和可选的错误日志输出
def stdio_client(server, errlog=sys.stderr):stdio_client 是一个上下文管理器,用于:
- 启动子进程
- 创建读写线程
- 向调用方提供
ReadStream和WriteStream - 退出时关闭管道并结束子进程
3.4.4 启动子进程 #
# 如果在Windows平台且server.command为"python",则用当前python解释器,否则用指定命令
cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
# 合并默认环境变量和server.env(后者可以覆盖前者)
env = {**_env(), **(server.env or {})}
# 创建子进程,指定stdin、stdout为管道,stderr为errlog,环境变量env,工作目录cwd,二进制模式
proc = subprocess.Popen(
[cmd] + server.args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,
env=env, cwd=server.cwd, text=False,
)- 命令:在 Windows 且
command == "python"时,用sys.executable保证用当前 Python;否则用server.command - 环境:
_env()与server.env合并,后者覆盖前者 - 管道:
stdin、stdout用管道,stderr输出到errlog(默认sys.stderr) text=False:以二进制模式读写,便于控制编码
3.4.5 读线程 read_stdout #
# 定义读取子进程 stdout 的函数
def read_stdout():
try:
# 持续循环,保持读取
while True:
# 逐行读取子进程的标准输出
line = proc.stdout.readline()
# 如果读到空字节(EOF),跳出循环
if not line:
break
# 按指定编码解码,并去除行首尾空白字符
line = line.decode(server.encoding).strip()
# 如果这一行不是空字符串
if line:
try:
# 尝试将这一行解析为 JSON-RPC 消息对象,封装成 SessionMessage 后放入读取队列
read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
except Exception:
# 如果解析失败,忽略本行
pass
finally:
# 循环结束时,向读队列放入 None,作为读取结束信号
read_q.put(None)流程:
- 用
readline()按行读子进程 stdout - 读到空字节表示 EOF,退出循环
- 按
server.encoding解码并 strip - 非空行用
jsonrpc_message_adapter.validate_json()解析为 JSON-RPC 消息 - 解析成功则封装为
SessionMessage放入read_q - 解析失败则忽略
- 结束时往
read_q放入None,作为“读取结束”信号
3.4.6 写线程 write_stdin #
# 定义写入子进程stdin的线程函数
def write_stdin():
try:
# 不断从写队列中取出消息对象(阻塞等待)
while (m := write_q.get()) is not None:
# 将消息对象序列化为JSON字符串,结尾加换行符,并按指定编码转换为字节串
s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
# 将编码后的字节串写入子进程的stdin
proc.stdin.write(s)
# 立即刷新buffer,确保数据及时发送到子进程
proc.stdin.flush()
# 捕获管道断开或连接重置(子进程可能提前退出),静默忽略异常直接退出线程
except (BrokenPipeError, ConnectionResetError):
pass流程:
- 从
write_q阻塞取消息,取到None表示结束 - 将
m.message序列化为 JSON 字符串,末尾加换行,再按server.encoding编码 - 写入
proc.stdin并flush - 遇到
BrokenPipeError或ConnectionResetError时静默退出(子进程可能已关闭)
3.4.7 ReadStream 和 WriteStream #
# 定义读取流类
class ReadStream:
# 从读取队列获取一条消息(阻塞),如果队列为空会等待
def get(self):
return read_q.get()
# 定义写入流类
class WriteStream:
# 将消息放入写入队列,由写线程发送到子进程
def send(self, msg):
write_q.put(msg)ReadStream.get():从read_q阻塞取一条消息(或None)WriteStream.send(msg):把消息放入write_q,由写线程发送
调用方只需使用这两个接口,不必直接操作队列和线程。
3.4.8 退出时的清理 #
# 尝试执行主逻辑,提供读写流给调用方
try:
yield ReadStream(), WriteStream()
# 无论主逻辑如何结束,最终都要执行资源清理
finally:
# 通知写线程停止(放入哨兵 None)
write_q.put(None)
try:
# 关闭子进程的标准输入,促使子进程感知 EOF
proc.stdin.close()
except Exception:
# 忽略关闭时发生的任何异常,保证流程继续
pass
# 等待子进程在2秒内正常退出
proc.wait(timeout=2)
# 如果2秒后进程还未退出,则强制杀掉
if proc.poll() is None:
proc.kill()
proc.wait()write_q.put(None):通知写线程停止proc.stdin.close():关闭 stdin,子进程读到 EOF 后应退出proc.wait(timeout=2):最多等 2 秒- 若仍未退出,
proc.kill()强制结束,再wait()回收
3.4.9 数据流示意 #
调用方 stdio_client
│ │
│ WriteStream.send(msg) │
│ ─────────────────────────► write_q ──► 写线程 ──► proc.stdin
│ │
│ ReadStream.get() │
│ ◄───────────────────────── read_q ◄── 读线程 ◄── proc.stdout
│ │3.4.10 设计要点 #
| 点 | 说明 |
|---|---|
| 双线程 | 读、写分离,避免阻塞;读线程负责解析 JSON-RPC |
| 队列 | Queue 线程安全,主逻辑与 I/O 解耦 |
| 守护线程 | 读写线程设为 daemon,主进程退出时自动结束 |
哨兵 None |
read_q 和 write_q 用 None 表示结束 |
| 超时与强制结束 | 2 秒内未退出则 kill,防止子进程卡死 |
3.4.11 典型用法 #
# 使用stdio_client上下文管理器启动子进程,并获取读写流对象
with stdio_client(StdioServerParameters(command="python", args=["1.mcp.py", "serve"])) as (read_stream, write_stream):
# 通过写流发送请求消息
write_stream.send(some_request_message)
# 通过读流获取响应消息
response = read_stream.get()实现了一个基于 stdio 的 MCP 客户端,通过子进程的 stdin/stdout 收发 JSON-RPC 消息,并用队列和线程完成异步读写。
4. 初始化 #
介绍初始化过程的完整细节,包括协议初始化的步骤、消息体设计、会话流程以及相关安全性要点。MCP 客户端与服务端约定,启动后需首先完成初始化(Initialize),以便双方协商协议版本与能力,确认后才能进行后续交互。这一阶段尤为重要,可以避免后续通信因能力不兼容、协议冲突等导致的异常。
初始化通常遵循如下流程:
客户端发起 Initialize 请求
客户端构造一个InitializeRequest,其中包含自身支持的协议版本、功能(capabilities)、实现信息(如产品名、版本等)等。该请求通过 JSON-RPC 格式发送给服务端。服务端校验并回复
服务端收到请求后,校验客户端协议版本是否支持,与自身能力进行对照。若无误,返回InitializeResult,其中说明实际使用的协议版本、服务端特性与约束。如果发生错误,则以 JSON-RPC 错误格式告知。客户端收到回复,若正常则发送 Initialized 通知
客户端收到确认后,再发送InitializedNotification,以通知服务端初始化阶段正式结束,接下来能正常发送其它业务请求。异常处理与兼容协议
若任何一方发现版本不兼容,或缺少必要能力,应及时终止会话并清晰反馈错误。
初始化过程示例消息(伪代码):
// InitializeRequest
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "0.1.0",
"implementation": { "name": "mcp_lite", "version": "0.1.0" },
"capabilities": { "uri": true, ... }
}
}
// InitializeResult
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "0.1.0",
"capabilities": { "uri": true, ... }
}
}
// InitializedNotification
{
"jsonrpc": "2.0",
"method": "initialized",
"params": {}
}要点:
- 初始化请求和响应参数详见
types.py的InitializeRequestParams、InitializeResult等结构体定义。 - 只有初始化完成,双方才能安全进行后续通信。
- 若初始化阶段出现异常(如版本不兼容),应明确抛出
MCPError,使调用者及时响应。
4.1. session.py #
mcp_lite/client/session.py
import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import (
InitializeRequestParams, # 导入初始化请求参数结构体
InitializeRequest, # 导入初始化请求结构体
LATEST_PROTOCOL_VERSION, # 导入协议最新版本号
ClientCapabilities, # 导入客户端能力描述
Implementation, # 导入实现信息结构体
InitializedNotification, # 导入初始化完成通知
InitializeResult, # 导入初始化请求返回结构体
JSONRPCRequest, # 导入 JSONRPC 请求类型
JSONRPCResponse, # 导入 JSONRPC 响应类型
JSONRPCError, # 导入 JSONRPC 错误类型
JSONRPCNotification, # 导入 JSONRPC 通知类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
# 初始化方法,接收错误码、错误信息和可选的数据
def __init__(self, code, message, data=None):
# 将错误码、错误信息、附加数据赋值为实例属性
self.code, self.message, self.data = code, message, data
# 调用父类 Exception 的初始化,只传递错误信息
super().__init__(message)
# 类方法,利用 JSONRPCError 对象创建 MCPError 实例
@classmethod
def from_jsonrpc(cls, err):
# 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
# 构造方法,接收读取流和写入流
def __init__(self, read_stream, write_stream):
# 赋值读取流和写入流
self._read, self._write = read_stream, write_stream
# 初始化请求 id 计数器为 0
self._req_id = 0
# 内部方法,发送请求并同步等待响应
def _request(self, req):
# 当前请求 id
rid = self._req_id
# 自增请求 id,确保下次唯一
self._req_id += 1
# 对请求对象进行序列化,转为 dict
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
# 打印请求信息到标准错误流
print("[Client] Request:", jreq.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
# 将请求对象封装成 SessionMessage 后发送
self._write.send(SessionMessage(message=jreq))
# 循环等待响应
while True:
# 从读取流取出一条消息
msg = self._read.get()
# 若取到 None,说明连接断开,抛出异常
if msg is None:
raise MCPError(-32000, "Connection closed")
# 若消息类型不是 SessionMessage,忽略继续等
if not isinstance(msg, SessionMessage):
continue
# 取消息内容
m = msg.message
# 若为 Response 或 Error,且 id 匹配
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
print("[Client] Response:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
# 若为错误,抛出 MCPError 异常
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
# 为正常响应则返回结果数据
return m.result
# 内部方法,发送通知消息
def _notify(self, n):
# 通知对象序列化为 dict
d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
self._write.send(SessionMessage(
message=JSONRPCNotification(
jsonrpc="2.0",
method=d["method"],
params=d.get("params"),
)
))
# 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
def initialize(self):
# 构造初始化请求,并发送等待返回
r = self._request(InitializeRequest(
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION, # 协议版本号
capabilities=ClientCapabilities(), # 客户端能力
client_info=Implementation(name="mcp_lite", version="0.1.0"), # 客户端信息
)
))
# 发送"初始化完成"通知
self._notify(InitializedNotification())
# 用 InitializeResult 验证响应并返回
return InitializeResult.model_validate(r, by_name=False)4.2. fastmcp.py #
mcp_lite/server/fastmcp.py
# 导入 sys 模块,用于标准输入输出操作
import sys
# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.server 模块导入 stdio 子模块
from mcp_lite.server import stdio
# 从 mcp_lite.types 模块导入所有相关类型和类
from mcp_lite.types import (
JSONRPCRequest, # JSONRPC 请求类型
JSONRPCError, # JSONRPC 错误类型
ErrorData, # 错误数据类型
InitializeResult, # 初始化响应类型
LATEST_PROTOCOL_VERSION, # 最新协议版本常量
ToolsCapability, # 工具能力类型
ServerCapabilities, # 服务器功能特性类型
Implementation, # 实现信息类型
JSONRPCResponse, # JSONRPC 响应类型
)
# 主服务类 FastMCP
class FastMCP:
# 初始化方法,支持自定义服务器名称
def __init__(self, name="mcp-server"):
# 保存服务器名称
self.name = name
# 实际 JSONRPC 方法分发与业务处理
def _handle(self, req):
# 拆分 method、params、id
method, params, rid = req.method, req.params or {}, req.id
# 处理初始化请求
if method == "initialize":
# 组装初始化响应,包括协议版本、能力与服务信息
r = InitializeResult(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=ServerCapabilities(tools=ToolsCapability()),
server_info=Implementation(name=self.name, version="0.1.0"),
)
# 返回 JSONRPCResponse 包装的结果
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
# 不支持的方法,返回 "方法未找到" 错误(code -32601)
return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))
# 处理 SessionMessage 消息,主要路由 JSONRPCRequest
def _handle_msg(self, msg):
# 判断消息类型不是 SessionMessage 则忽略
if not isinstance(msg, SessionMessage):
return None
# 获取消息体
m = msg.message
# 只处理 JSONRPCRequest 类型
if not isinstance(m, JSONRPCRequest):
return None
print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
try:
# 调用实际业务处理方法
resp = self._handle(m)
print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return resp
except Exception as e:
# 捕获异常,返回标准 JSONRPCError(code -32603,内部错误)
err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
return err
# 运行服务方法,默认采用 stdio 传输方式
def run(self, transport="stdio"):
# 仅支持 stdio,其他方式抛出异常
if transport != "stdio":
raise ValueError(f"unsupported transport: {transport}")
# 启动 stdio server,当前对象 _handle_msg 作为消息处理回调
stdio.stdio_server(self._handle_msg) 4.3. stdio.py #
mcp_lite/server/stdio.py
# 导入 sys 模块,用于标准输入输出操作
import sys
# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入 jsonrpc_message_adapter 适配器
from mcp_lite.types import jsonrpc_message_adapter
# 定义 stdio_server 函数,参数 handler 是处理消息的回调函数
def stdio_server(handler):
# 进入一个无限循环,持续处理标准输入的数据
while True:
# 从标准输入读取一行
line = sys.stdin.readline()
# 如果读取到空字符串,说明 EOF,跳出循环
if not line:
break
# 去除读取行首尾的空白字符
line = line.strip()
# 如果去除空白后字符串仍为空,继续读取下一行
if not line:
continue
try:
# 使用 jsonrpc_message_adapter 对输入字符串进行解析为消息对象
msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
# 使用解析后的消息包装成 SessionMessage 再传递给 handler 处理,获取响应
response = handler(SessionMessage(message=msg))
# 判断 handler 是否返回了响应结果
if response is not None:
# 如果响应对象有 message 属性,则直接使用;否则把响应包装成 SessionMessage
r = response if hasattr(response, "message") else SessionMessage(message=response)
# 将响应的 message 属性序列化为 JSON 字符串,写入标准输出,并换行
sys.stdout.write(r.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
# 立即刷新标准输出缓冲区,确保数据被及时输出
sys.stdout.flush()
except Exception:
print("[Server] Error:", sys.exc_info(), file=sys.stderr)
# 捕获所有异常,忽略错误继续处理下一行
pass4.4. 1.mcp.py #
1.mcp.py
# 导入 sys 模块
import sys
# 导入 mcp_lite 模块
+from mcp_lite import ClientSession,StdioServerParameters
# 导入 mcp_lite.client.stdio 模块
from mcp_lite.client.stdio import stdio_client
# 导入 mcp_lite.server.fastmcp 模块
+from mcp_lite.server.fastmcp import FastMCP
# 创建一个名为 "Hello-MCP" 的 FastMCP 服务器对象
+mcp = FastMCP(name="Hello-MCP")
# 定义客户端启动函数
def run_client():
# 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
# 使用上下文管理器启动 stdio_client,返回读写流
with stdio_client(server_params) as (read, write):
# 创建客户端会话对象,绑定读写流
+ session = ClientSession(read, write)
# 初始化会话,完成握手
+ session.initialize()
# 定义以 stdio 运行的服务器端函数
+def run_server_stdio():
# 在标准错误流打印启动提示
+ print("启动 MCP 服务器(stdio 模式)...", file=sys.stderr)
# 以 stdio 方式运行 mcp 服务器
+ mcp.run(transport="stdio")
# 主入口函数,根据命令行参数决定运行模式
def main():
# 如果参数含有"serve",以服务器模式运行
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
+ run_server_stdio()
else:
# 否则启动客户端测试流程,自动拉起服务端
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
run_client()
# 判断是否直接运行此文件
if __name__ == "__main__":
main()
官方代码
# 导入 sys 模块
import sys
#from mcp_lite import ClientSession,StdioServerParameters
from mcp import ClientSession,StdioServerParameters
#from mcp_lite.client.stdio import stdio_client
from mcp.client.stdio import stdio_client
#from mcp_lite.server.fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP
# 导入 asyncio 模块,用于异步编程
import asyncio
mcp = FastMCP(name="Hello-MCP")
# 定义客户端启动函数
async def run_client():
# 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
async with stdio_client(server_params) as (read, write):
# 创建客户端会话对象,绑定读写流
session = ClientSession(read, write)
# 初始化会话,完成握手
await session.initialize()
# 定义以 stdio 运行的服务器端函数
def run_server_stdio():
# 在标准错误流打印启动提示
print("启动 MCP 服务器(stdio 模式)...", file=sys.stderr)
# 以 stdio 方式运行 mcp 服务器
mcp.run(transport="stdio")
# 主入口函数,根据命令行参数决定运行模式
def main():
# 如果参数含有"serve",以服务器模式运行
if len(sys.argv) >= 2 and sys.argv[1] == "serve":
run_server_stdio()
else:
# 否则启动客户端测试流程,自动拉起服务端
print("启动 MCP 客户端测试...")
print("将自动启动服务器进程并测试连接")
asyncio.run(run_client())
# 判断是否直接运行此文件
if __name__ == "__main__":
main()
4.5. init.py #
mcp_lite/init.py
from mcp_lite.client.stdio import StdioServerParameters
# 从 mcp_lite.client.session 导入 ClientSession 类
+from mcp_lite.client.session import ClientSession
# 设置当前模块的对外可见成员列表
+__all__ = ["StdioServerParameters", "ClientSession"]
4.6. stdio.py #
mcp_lite/client/stdio.py
# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入os模块,用于与操作系统交互
import os
# 导入sys模块,访问与Python解释器相关的变量和函数
import sys
# 导入threading模块,支持多线程
import threading
# 从contextlib中导入contextmanager装饰器,用于实现上下文管理器
from contextlib import contextmanager
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 从queue模块导入Queue类,实现线程安全的队列
from queue import Queue
# 从mcp_lite.message模块导入SessionMessage类
from mcp_lite.message import SessionMessage
# 从mcp_lite.types模块导入jsonrpc_message_adapter
from mcp_lite.types import jsonrpc_message_adapter
# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
# 要执行的命令(字符串类型)
command: str
# 命令行参数,默认为空列表
args: list = Field(default_factory=list)
# 额外的环境变量,可以为None
env: dict | None = None
# 工作目录,可以为str、Path或None
cwd: str | Path | None = None
# 字符编码,默认为utf-8
encoding: str = "utf-8"
# 定义环境变量名列表,根据平台不同选择不同的变量集合
_ENV_VARS = (
# 如果是在Windows平台,使用以下环境变量名
["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
"PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
# 否则(非Windows),使用以下环境变量名
if sys.platform == "win32"
else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)
# 获取环境变量,并排除值以"()"开头的变量
def _env():
# 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}
# 定义stdio_client上下文管理器,实现通过stdio与子进程通信
@contextmanager
def stdio_client(server, errlog=sys.stderr):
# 创建线程安全的读取队列
read_q = Queue()
# 创建线程安全的写入队列
write_q = Queue()
# 如果在Windows上且server.command为"python",则用当前Python解释器,否则用指定命令
cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
# 合并环境变量,自定义env覆盖默认环境
env = {**_env(), **(server.env or {})}
# 启动子进程,并重定向其stdin、stdout和stderr
proc = subprocess.Popen(
[cmd] + server.args,#cmd是python解释器,server.args是当前脚本文件名和默认参数 "serve"
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,#stdin是标准输入,stdout是标准输出,stderr是标准错误
env=env, cwd=server.cwd, text=False,#env是环境变量,cwd是工作目录,text=False是二进制模式
)#启动子进程,并重定向其stdin、stdout和stderr
# 定义从子进程stdout读取数据的线程函数
def read_stdout():
try:
while True:
# 持续逐行读取子进程的标准输出
line = proc.stdout.readline()
# 如果读到空行说明输出结束,退出循环
if not line:
break
# 按指定编码解码,并去除首尾空白字符
line = line.decode(server.encoding).strip()
# 只处理非空行
if line:
try:
# 将JSON RPC消息解析为SessionMessage对象并放入读取队列
read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
except Exception:
# 解析异常时忽略该消息
pass
finally:
# 读取结束在队列插入None,通知主线程
read_q.put(None)
# 定义向子进程stdin写入数据的线程函数
def write_stdin():
try:
# 不断尝试取出待写入的消息(阻塞)
while (m := write_q.get()) is not None:
# 将消息对象序列化为json字符串,追加换行并编码,写入stdin
s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
proc.stdin.write(s)
proc.stdin.flush()
except (BrokenPipeError, ConnectionResetError):
# 若出现BrokenPipe或ConnectionReset,直接退出写线程
pass
# 创建并启动读取线程,设置为守护线程
rt = threading.Thread(target=read_stdout, daemon=True)
# 创建并启动写入线程,设置为守护线程
wt = threading.Thread(target=write_stdin, daemon=True)
rt.start()
wt.start()
# 定义读取流的类
class ReadStream:
# 从读取队列获取数据(阻塞),返回一条消息
def get(self):
return read_q.get()
# 定义写入流的类
class WriteStream:
# 将消息放入写入队列,等待写线程处理
def send(self, msg):
write_q.put(msg)
try:
# 让调用者获得读取流和写入流对象
yield ReadStream(), WriteStream()
finally:
# 1. 通知写线程停止
write_q.put(None)
# 2. 等待写线程结束(给它时间处理完队列里的消息并退出)
+ wt.join(timeout=3)
# 3. 再关闭 stdin
try:
proc.stdin.close()
except Exception:
pass
# 等待子进程退出,最多2秒
proc.wait(timeout=2)
# 如果进程还未退出,强制杀死进程并等待其退出
if proc.poll() is None:
proc.kill()
proc.wait()
4.7. types.py #
mcp_lite/types.py
# 导入 Any 和 Literal 类型注解
from typing import Any, Literal
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter
+from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
# 导入 pydantic 的 to_camel 驼峰命名生成器
+from pydantic.alias_generators import to_camel
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
+class MCPModel(BaseModel):
# 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
+ model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# 定义客户端能力结构体
+class ClientCapabilities(MCPModel):
# 可选字段:客户端实验性能力扩展,键为 str,值为字典
+ experimental: dict[str, dict[str, Any]] | None = None
# 定义服务器或客户端的实现信息结构体
+class Implementation(MCPModel):
# 实现的名称
+ name: str = ""
# 实现的版本号
+ version: str = ""
# 可选字段:人类可读的标题
+ title: str | None = None
# 可选字段:实现的描述
+ description: str | None = None
# 定义初始化请求参数结构体
+class InitializeRequestParams(MCPModel):
# 协议版本号
+ protocol_version: str = ""
# 可选字段:客户端能力描述
+ capabilities: ClientCapabilities = None
# 可选字段:客户端实现信息
+ client_info: Implementation = None
# 定义初始化请求结构体
+class InitializeRequest(MCPModel):
# 方法名,固定为 "initialize"
+ method: Literal["initialize"] = "initialize"
# 可选字段:参数,类型为 InitializeRequestParams
+ params: InitializeRequestParams = None
# 当前协议的最新版本号
+LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
+class ToolsCapability(MCPModel):
# 工具列表是否发生变化,可选布尔类型
+ list_changed: bool | None = None
# 定义服务端能力描述结构体
+class ServerCapabilities(MCPModel):
# 可选字段:实验性能力扩展
+ experimental: dict[str, dict[str, Any]] | None = None
# 可选字段:工具能力
+ tools: ToolsCapability | None = None
# 定义初始化响应结构体
+class InitializeResult(MCPModel):
# 协议版本号
+ protocol_version: str = ""
# 可选字段:服务端能力描述
+ capabilities: ServerCapabilities = None
# 可选字段:服务端实现信息
+ server_info: Implementation = None
# 可选字段:初始化说明信息
+ instructions: str | None = None
# 定义客户端初始化完成通知结构体
+class InitializedNotification(MCPModel):
# 方法名,固定为 "notifications/initialized"
+ method: Literal["notifications/initialized"] = "notifications/initialized"
# 可选字段:通知参数,可以为字典或 None
+ params: dict[str, Any] | None = None
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 请求 ID,可以为 int 或 str 类型
id: RequestId = None
# 方法名称,字符串类型
method: str = ""
# 方法参数,为一个字典或 None
params: dict[str, Any] | None = None
# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 响应的 ID,需要与请求 ID 匹配
id: RequestId = None
# 响应结果,可以为字典或 None
result: dict[str, Any] = None
# 定义错误的数据结构
class ErrorData(BaseModel):
# 错误码,默认值为 0
code: int = 0
# 错误信息,默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或 None
data: Any = None
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
jsonrpc: Literal["2.0"] = "2.0"
# 错误对应的请求 ID,可以为 None
id: RequestId | None = None
# 错误的详细信息,类型为 ErrorData
error: ErrorData = None
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
+class JSONRPCNotification(BaseModel):
# jsonrpc 协议版本,固定为 "2.0"
+ jsonrpc: Literal["2.0"] = "2.0"
# 通知的方法名称
+ method: str = ""
# 通知参数,可以为字典或者 None
+ params: dict[str, Any] | None = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)4.8 工作流程 #
4.8.1 整体流程 #
初始化是 MCP 客户端和服务端建立会话前的握手:
- 客户端发
InitializeRequest - 服务端返回
InitializeResult - 客户端发
InitializedNotification - 之后才能正常收发业务请求
4.8.2 session.py客户端会话 #
# 定义客户端会话类
class ClientSession:
# 构造函数,接收读写流
def __init__(self, read_stream, write_stream):
# 保存读流和写流对象
self._read, self._write = read_stream, write_stream
# 初始化请求ID计数器,从0开始
self._req_id = 0_read/_write:读写流,由stdio_client提供_req_id:请求 ID 计数器,用于 JSON-RPC 的id字段
4.8.3 _request:发请求并等待响应 #
# 定义 _request 方法,用于发送请求并等待响应
def _request(self, req):
# 使用当前请求ID
rid = self._req_id
# 请求ID递增,为下次请求准备
self._req_id += 1
# 将请求对象序列化为字典,采用别名、JSON模式并排除None字段
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构造 JSONRPCRequest 对象
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
# 将请求通过写流发出
self._write.send(SessionMessage(message=jreq))
# 不断轮询等待响应消息
while True:
# 从读流获取一条消息
msg = self._read.get()
# 如果收到None,说明连接被关闭,抛出异常
if msg is None:
raise MCPError(-32000, "Connection closed")
# 只处理 SessionMessage 类型的消息,其它跳过
if not isinstance(msg, SessionMessage):
continue
# 取出实际消息内容
m = msg.message
# 如果收到响应或错误,且id匹配,则进入处理
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
# 如果是错误类型,抛出相应的异常
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
# 如果是正常响应,返回其result字段
return m.result- 把请求转成
JSONRPCRequest,通过_write.send()发送 - 循环从
_read.get()取消息,直到收到id == rid的 Response 或 Error - 收到 Error 时抛出
MCPError,否则返回m.result
4.8.4 _notify:发通知(无响应) #
# 定义 _notify 方法,用于发送通知(不期待响应)
def _notify(self, n):
# 将通知对象序列化为字典,采用别名、JSON模式并排除None字段
d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构造 JSONRPCNotification 消息并通过写流发送出去
self._write.send(SessionMessage(message=JSONRPCNotification(
jsonrpc="2.0",
method=d["method"],
params=d.get("params")
)))- 通知没有
id,服务端不返回 JSON-RPC 响应
4.8.5 initialize:初始化握手 #
# 定义 initialize 方法,用于执行客户端与服务端的初始化握手流程
def initialize(self):
# 发送 InitializeRequest 初始化请求,并等待响应,结果保存在 r 中
r = self._request(InitializeRequest(...))
# 发送 InitializedNotification 通知,告知服务端已初始化
self._notify(InitializedNotification())
# 使用 InitializeResult 对收到的响应结果进行校验和转换,并返回
return InitializeResult.model_validate(r, by_name=False)- 先发
InitializeRequest并拿到InitializeResult - 再发
InitializedNotification - 最后用
InitializeResult.model_validate校验并返回
4.8.6 fastmcp.py服务端 #
# 定义 FastMCP 类,作为 MCP 服务端核心逻辑
class FastMCP:
# _handle 方法处理传入的请求 req
def _handle(self, req):
# 从请求中获取方法名、参数字典和请求ID
method, params, rid = req.method, req.params or {}, req.id
# 如果方法名为 "initialize"
if method == "initialize":
# 构造 InitializeResult 响应对象
r = InitializeResult(...)
# 返回 JSONRPCResponse,包含对应的请求ID和序列化后的结果
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(...))
# 如果方法未找到,返回 JSONRPCError 错误对象
return JSONRPCError(...) # 方法未找到- 只处理
initialize方法,返回InitializeResult - 其他方法返回 JSON-RPC 错误
# 定义 run 方法,默认使用 "stdio" 作为传输方式
def run(self, transport="stdio"):
# 调用 stdio 的 stdio_server,以当前实例的 _handle_msg 作为处理函数
stdio.stdio_server(self._handle_msg)- 通过
stdio_server从 stdin 读请求,调用_handle_msg处理,把响应写到 stdout
4.8.7 stdio.py(服务端)单进程 stdio 循环 #
# 定义 stdio_server 函数,接收一个消息处理 handler
def stdio_server(handler):
# 循环不断从标准输入读取消息
while True:
# 读取一行输入
line = sys.stdin.readline()
# 如果读到空字符串,说明输入结束,跳出循环
if not line:
break
# 去除两端空白字符
line = line.strip()
# 如果行为空,则跳过当前循环
if not line:
continue
# 使用 jsonrpc_message_adapter 解析 JSON-RPC 消息
msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
# 用 SessionMessage 包装后交给 handler 处理,获得响应
response = handler(SessionMessage(message=msg))
# 如果有响应对象
if response is not None:
# 如果响应带有 message 属性,则使用,否则用 SessionMessage 包装
r = response if hasattr(response, "message") else SessionMessage(message=response)
# 将响应的 message 序列化为 JSON 并写到标准输出,并追加换行符
sys.stdout.write(r.message.model_dump_json(...) + "\n")
# 立即刷新缓冲区
sys.stdout.flush()- 按行从 stdin 读 JSON-RPC 消息
- 用
jsonrpc_message_adapter解析 - 交给
handler处理,得到响应后写回 stdout
4.8.8 1.mcp.py主入口 #
+from mcp_lite import ClientSession, StdioServerParameters
+from mcp_lite.server.fastmcp import FastMCP
+mcp = FastMCP(name="Hello-MCP")- 引入
ClientSession、StdioServerParameters、FastMCP,并创建 MCP 服务实例
+ session = ClientSession(read, write)
+ session.initialize()- 在
with stdio_client(...)内创建ClientSession并调用initialize()完成握手
+def run_server_stdio():
+ mcp.run(transport="stdio")- 服务端模式:用 stdio 运行 MCP 服务
+ if len(sys.argv) >= 2 and sys.argv[1] == "serve":
+ run_server_stdio()- 通过
serve参数区分客户端 / 服务端模式
4.8.9 stdio.py(客户端)关键修复 #
finally:
write_q.put(None)
+ wt.join(timeout=3)
try:
proc.stdin.close()write_q.put(None):通知写线程停止wt.join(timeout=3):等待写线程结束,最多 3 秒- 然后再
proc.stdin.close()
原因:initialize() 会发 InitializedNotification,写线程可能还在往 proc.stdin 写。如果主线程先关闭 stdin,写线程会抛出 ValueError: write to closed file。先 wt.join() 再关闭 stdin,可以避免这个竞态。
4.8.10 types.py 类型与适配器 #
4.8.11 MCP 模型基类 #
class MCPModel(BaseModel):
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)- 使用驼峰命名(
protocolVersion等) populate_by_name=True:支持用 snake_case 或 camelCase 填充
4.8.12 初始化相关类型 #
| 类型 | 作用 |
|---|---|
InitializeRequestParams |
初始化请求参数(协议版本、能力、客户端信息) |
InitializeRequest |
method="initialize" 的请求 |
InitializeResult |
初始化响应(协议版本、服务端能力、服务端信息) |
InitializedNotification |
method="notifications/initialized" 的通知 |
4.8.13 JSON-RPC 类型 #
JSONRPCRequest:请求(有id)JSONRPCResponse:成功响应JSONRPCError:错误响应JSONRPCNotification:通知(无id)
4.8.14 TypeAdapter #
# 定义 JSONRPC 消息的联合类型,可以是请求、通知、响应或错误(类型联合)
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 创建 TypeAdapter,用于自动推断和校验 JSONRPC 消息类型
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)jsonrpc_message_adapter.validate_json(line):把一行 JSON 解析成上述四种消息之一,并做校验
4.8.15 消息流示意 #
客户端 服务端
│ │
│ InitializeRequest │
│ ─────────────────────────────────►│
│ │ _handle("initialize")
│ InitializeResult │
│ ◄─────────────────────────────────│
│ │
│ InitializedNotification │
│ ─────────────────────────────────►│
│ │ (无响应)
│ │
│ initialize() 返回 │
│ 退出 with,清理 stdio_client │4.8.16 小结 #
| 文件 | 职责 |
|---|---|
session.py |
客户端会话:发请求、收响应、发通知、完成初始化 |
fastmcp.py |
服务端:处理 initialize 等 JSON-RPC 方法 |
server/stdio.py |
服务端 stdio:读 stdin、解析、调用 handler、写 stdout |
client/stdio.py |
客户端 stdio:启动子进程、读写线程、队列、wt.join 修复 |
types.py |
MCP 与 JSON-RPC 的类型定义和 TypeAdapter |
1.mcp.py |
主入口:按参数选择客户端或服务端模式 |
核心改动是:在 client/stdio.py 的 finally 中增加 wt.join(timeout=3),在关闭 proc.stdin 前等待写线程结束,避免 ValueError: write to closed file。
4.8.17 请求响应对应 #
请求和响应的对应关系是这样保证的:
4.8.17.1. JSON-RPC 的 id 字段 #
每个请求都带一个 id(int 或 str),响应会原样回填同一个 id,用于一一对应:
# mcp_lite/types.py
class JSONRPCRequest(BaseModel):
jsonrpc: Literal["2.0"] = "2.0"
id: RequestId = None # 请求 ID
method: str = ""4.8.17.2. 客户端:请求 ID 生成与匹配 #
mcp_lite 客户端(同步):
# mcp_lite/client/session.py
def _request(self, req):
rid = self._req_id
self._req_id += 1 # 自增保证唯一
jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=..., params=...)
self._write.send(SessionMessage(message=jreq))
while True:
msg = self._read.get()
# 只处理 id 匹配的响应
if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
return m.result # 或抛出错误4.8.17.3. 服务端:在响应中回填 id #
# mcp_lite/server/fastmcp.py
def _handle(self, req):
method, params, rid = req.method, req.params or {}, req.id
# ... 处理逻辑 ...
return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(...))4.8.17.4. ID 类型统一 #
python-sdk 会做 ID 类型归一化,保证字符串形式的 ID 也能正确匹配:
def _normalize_request_id(self, response_id: RequestId) -> RequestId:
if isinstance(response_id, str):
try:
return int(response_id) # 字符串转整数以匹配客户端
except ValueError:
...
return response_id4.8.17.5. 总结 #
| 机制 | 作用 |
|---|---|
| 请求 ID 自增 | 保证每个请求有唯一标识 |
| 响应回填 id | 服务端在响应中返回相同 id |
| 按 id 匹配 | 客户端用 id 把响应路由到对应请求 |
| response_streams 映射 | 异步场景下用 request_id 映射到对应的 Promise/流 |
| ID 归一化 | 统一 int / str 表示,避免匹配失败 |
整体上,这是标准的 JSON-RPC 2.0 请求-响应配对方式:请求带 id,响应带相同 id,客户端据此完成一一对应。