导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1. 功能介绍
  • 2. 服务器参数类型
    • 2.1. 1.mcp.py
    • 2.2. init.py
    • 2.3. init.py
    • 2.4. stdio.py
  • 3. 客户端
    • 3.1. 1.mcp.py
    • 3.2. message.py
    • 3.3. types.py
    • 3.4. stdio.py
    • 3.4 工作流程
      • 3.4.1. 数据模型:StdioServerParameters
      • 3.4.2. 环境变量:_env() 和 _ENV_VARS
      • 3.4.3 核心:stdio_client 上下文管理器
      • 3.4.4 启动子进程
      • 3.4.5 读线程 read_stdout
      • 3.4.6 写线程 write_stdin
      • 3.4.7 ReadStream 和 WriteStream
      • 3.4.8 退出时的清理
      • 3.4.9 数据流示意
      • 3.4.10 设计要点
      • 3.4.11 典型用法
  • 4. 初始化
    • 4.1. session.py
    • 4.2. fastmcp.py
    • 4.3. stdio.py
    • 4.4. 1.mcp.py
    • 4.5. init.py
    • 4.6. stdio.py
    • 4.7. types.py
    • 4.8 工作流程
      • 4.8.1 整体流程
      • 4.8.2 session.py客户端会话
      • 4.8.3 _request:发请求并等待响应
      • 4.8.4 _notify:发通知(无响应)
      • 4.8.5 initialize:初始化握手
      • 4.8.6 fastmcp.py服务端
      • 4.8.7 stdio.py(服务端)单进程 stdio 循环
      • 4.8.8 1.mcp.py主入口
      • 4.8.9 stdio.py(客户端)关键修复
      • 4.8.10 types.py 类型与适配器
      • 4.8.11 MCP 模型基类
      • 4.8.12 初始化相关类型
      • 4.8.13 JSON-RPC 类型
      • 4.8.14 TypeAdapter
      • 4.8.15 消息流示意
      • 4.8.16 小结
      • 4.8.17 请求响应对应
        • 4.8.17.1. JSON-RPC 的 id 字段
        • 4.8.17.2. 客户端:请求 ID 生成与匹配
        • 4.8.17.3. 服务端:在响应中回填 id
        • 4.8.17.4. ID 类型统一
        • 4.8.17.5. 总结

1. 功能介绍 #

  • subprocess.Popen
  • 什么是 MCP 规范?
  • 什么是 MCP 架构?
  • 什么是基础协议?
  • 什么是传输?
  • 什么是生命周期?
  • python-sdk

这是一个 MCP(Model Context Protocol)示例,演示如何在同一进程中:

  1. 启动一个 MCP 服务器(子进程)
  2. 作为客户端连接该服务器
  3. 初始化、列出工具、调用工具

通过命令行参数在「服务器模式」和「客户端模式」之间切换。

2. 服务器参数类型 #

StdioServerParameters 是 MCP Lite 客户端用于描述如何启动服务器子进程的参数类型。本质上,它封装了利用 stdio(标准输入输出)与模型进程进行通信时的子进程启动方式,包括:

  • command:要启动的可执行文件(比如 "python")。
  • args:传递给命令的参数列表(如当前脚本文件名和默认参数 "serve")。

使用 StdioServerParameters,可以标准化和简化服务器自动拉起流程,例如:

# 构造参数并启动子进程
params = StdioServerParameters(command="python", args=["my_model_server.py", "serve"])

如此,客户端只需描述如何启动服务器,并不直接关心底层进程/管道管理的细节。这种方式便于自测、本地调试、支持多种模型后端,极大提升开发效率。

npx @modelcontextprotocol/inspector uv --directory D:/forever/docs/mcpsdk2 run 1.mcp.py serve

2.1. 1.mcp.py #

1.mcp.py

# 导入 sys 模块
import sys
from mcp_lite import StdioServerParameters

# 定义客户端启动函数
def run_client():
    # 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
    print(server_params)

# 主入口函数,根据命令行参数决定运行模式
def main():
    # 如果参数含有"serve",以服务器模式运行
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
        pass
    else:
        # 否则启动客户端测试流程,自动拉起服务端
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        run_client()

# 判断是否直接运行此文件
if __name__ == "__main__":
    main()

官方代码

# 导入 sys 模块
import sys
from mcp import StdioServerParameters

# 定义客户端启动函数
def run_client():
    # 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
    print(server_params)

# 主入口函数,根据命令行参数决定运行模式
def main():
    # 如果参数含有"serve",以服务器模式运行
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
        pass
    else:
        # 否则启动客户端测试流程,自动拉起服务端
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        run_client()

# 判断是否直接运行此文件
if __name__ == "__main__":
    main()

2.2. init.py #

mcp_lite/init.py

from mcp_lite.client.stdio import StdioServerParameters

# 设置当前模块的对外可见成员列表
__all__ = ["StdioServerParameters"]

2.3. init.py #

mcp_lite/client/init.py

2.4. stdio.py #

mcp_lite/client/stdio.py


# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path

# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
    # 要执行的命令(字符串类型)
    command: str
    # 命令行参数,默认为空列表
    args: list = Field(default_factory=list)
    # 额外的环境变量,可以为None
    env: dict | None = None
    # 工作目录,可以为str、Path或None
    cwd: str | Path | None = None
    # 字符编码,默认为utf-8
    encoding: str = "utf-8"

3. 客户端 #

3.1. 1.mcp.py #

1.mcp.py

# 导入 sys 模块
import sys
from mcp_lite import StdioServerParameters
+from mcp_lite.client.stdio import stdio_client
# 定义客户端启动函数
def run_client():
    # 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
     # 使用上下文管理器启动 stdio_client,返回读写流
+   with stdio_client(server_params) as (read, write):
+       print(read, write)

# 主入口函数,根据命令行参数决定运行模式
def main():
    # 如果参数含有"serve",以服务器模式运行
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
        pass
    else:
        # 否则启动客户端测试流程,自动拉起服务端
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        run_client()

# 判断是否直接运行此文件
if __name__ == "__main__":
    main()

stdio_client 来自官方 MCP Python SDK(mcp.client.stdio),它是一个异步上下文管理器,返回的是 _AsyncGeneratorContextManager,只能配合 async with 使用。

而 stdio_client 只实现了 __aenter__ / __aexit__,没有实现 __enter__ / __exit__,所以不能用在普通的 with 里,会报:

TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol

官方代码

# 导入异步相关模块
import asyncio
# 导入系统模块,处理命令行参数等
import sys
# 从 mcp 包导入 StdioServerParameters,构建进程参数
from mcp import StdioServerParameters
# 从 mcp.client.stdio 导入 stdio_client,用于启动子进程并异步通信
from mcp.client.stdio import stdio_client

# 定义异步的客户端运行函数
async def run_client():
    # 创建 StdioServerParameters,command 用 python 执行当前文件并传入 'serve'
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
    # 通过 async with 管理 stdio_client 的异步上下文,获得读写流
    async with stdio_client(server_params) as (read, write):  # ✅ 使用 async with
        # 打印读写流对象,简单测试
        print(read, write)

# 定义主入口函数
def main():
    # 如果命令行参数包含 'serve',说明以服务器模式运行,此处暂未实现
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
        pass
    else:
        # 否则以客户端模式运行,打印提示
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        # 使用 asyncio.run 运行异步的 run_client 函数
        asyncio.run(run_client())  # 用 asyncio.run 运行异步函数

# 判断是否直接以主程序运行本文件
if __name__ == "__main__":
    # 入口调用 main()
    main()

3.2. message.py #

mcp_lite/message.py

# 定义一个名为 SessionMessage 的类
class SessionMessage:
    # 定义初始化方法,接收一个 message 参数
    def __init__(self, message):
        # 将传入的 message 参数赋给实例属性 self.message
        self.message = message

3.3. types.py #

mcp_lite/types.py

# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
# 导入 Any 和 Literal 类型注解
from typing import Any, Literal
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str

# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求 ID,可以为 int 或 str 类型
    id: RequestId = None
    # 方法名称,字符串类型
    method: str = ""
    # 方法参数,为一个字典或 None
    params: dict[str, Any] | None = None

# 定义 JSONRPC 通知的数据结构(没有 id 字段)
class JSONRPCNotification(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 通知的方法名称
    method: str = ""
    # 通知参数,可以为字典或者 None
    params: dict[str, Any] | None = None

# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 响应的 ID,需要与请求 ID 匹配
    id: RequestId = None
    # 响应结果,可以为字典或 None
    result: dict[str, Any] = None

# 定义错误的数据结构
class ErrorData(BaseModel):
    # 错误码,默认值为 0
    code: int = 0
    # 错误信息,默认值为空字符串
    message: str = ""
    # 附加的错误数据,可以为任意类型或 None
    data: Any = None

# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 错误对应的请求 ID,可以为 None
    id: RequestId | None = None
    # 错误的详细信息,类型为 ErrorData
    error: ErrorData = None
# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)

3.4. stdio.py #

mcp_lite/client/stdio.py

# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 导入subprocess模块,用于创建和管理子进程
+import subprocess
# 导入os模块,用于与操作系统交互
+import os
# 导入sys模块,访问与Python解释器相关的变量和函数
+import sys
# 导入threading模块,支持多线程
+import threading
# 从contextlib中导入contextmanager装饰器,用于实现上下文管理器
+from contextlib import contextmanager
# 从pathlib模块导入Path类,用于路径操作
+from pathlib import Path
# 从queue模块导入Queue类,实现线程安全的队列
+from queue import Queue
# 从mcp_lite.message模块导入SessionMessage类
+from mcp_lite.message import SessionMessage
# 从mcp_lite.types模块导入jsonrpc_message_adapter
+from mcp_lite.types import jsonrpc_message_adapter
# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
    # 要执行的命令(字符串类型)
    command: str
    # 命令行参数,默认为空列表
    args: list = Field(default_factory=list)
    # 额外的环境变量,可以为None
    env: dict | None = None
    # 工作目录,可以为str、Path或None
    cwd: str | Path | None = None
    # 字符编码,默认为utf-8
    encoding: str = "utf-8"

# 定义环境变量名列表,根据平台不同选择不同的变量集合
+_ENV_VARS = (
    # 如果是在Windows平台,使用以下环境变量名
+   ["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
+    "PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
    # 否则(非Windows),使用以下环境变量名
+   if sys.platform == "win32"
+   else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
+)

# 获取环境变量,并排除值以"()"开头的变量
+def _env():
    # 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
+   return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}

# 定义stdio_client上下文管理器,实现通过stdio与子进程通信
+@contextmanager
+def stdio_client(server, errlog=sys.stderr):
    # 创建线程安全的读取队列
+   read_q = Queue()
    # 创建线程安全的写入队列
+   write_q = Queue()
    # 如果在Windows上且server.command为"python",则用当前Python解释器,否则用指定命令
+   cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
    # 合并环境变量,自定义env覆盖默认环境;强制子进程使用 UTF-8 避免 Windows 下中文解码错误
+   env = {**_env(), "PYTHONIOENCODING": "utf-8", **(server.env or {})}

    # 启动子进程,并重定向其stdin、stdout和stderr
+   proc = subprocess.Popen(
+       [cmd] + server.args,#cmd是python解释器,server.args是当前脚本文件名和默认参数 "serve"
+       stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,#stdin是标准输入,stdout是标准输出,stderr是标准错误
+       env=env, cwd=server.cwd, text=False,#env是环境变量,cwd是工作目录,text=False是二进制模式
+   )#启动子进程,并重定向其stdin、stdout和stderr

    # 定义从子进程stdout读取数据的线程函数
+   def read_stdout():
+       try:
+           while True:
                # 持续逐行读取子进程的标准输出
+               line = proc.stdout.readline()
                # 如果读到空行说明输出结束,退出循环
+               if not line:
+                   break
                # 按指定编码解码,并去除首尾空白字符
+               line = line.decode(server.encoding).strip()
                # 只处理非空行
+               if line:
+                   try:
                        # 将JSON RPC消息解析为SessionMessage对象并放入读取队列
+                       read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
+                   except Exception:
                        # 解析异常时忽略该消息
+                       pass
+       finally:
            # 读取结束在队列插入None,通知主线程
+           read_q.put(None)

    # 定义向子进程stdin写入数据的线程函数
+   def write_stdin():
+       try:
            # 不断尝试取出待写入的消息(阻塞)
+           while (m := write_q.get()) is not None:
                # 将消息对象序列化为json字符串,追加换行并编码,写入stdin
+               s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
+               proc.stdin.write(s)
+               proc.stdin.flush()
+       except (BrokenPipeError, ConnectionResetError):
            # 若出现BrokenPipe或ConnectionReset,直接退出写线程
+           pass

    # 创建并启动读取线程,设置为守护线程
+   rt = threading.Thread(target=read_stdout, daemon=True)
    # 创建并启动写入线程,设置为守护线程
+   wt = threading.Thread(target=write_stdin, daemon=True)
+   rt.start()
+   wt.start()

    # 定义读取流的类
+   class ReadStream:
        # 从读取队列获取数据(阻塞),返回一条消息
+       def get(self):
+           return read_q.get()

    # 定义写入流的类
+   class WriteStream:
        # 将消息放入写入队列,等待写线程处理
+       def send(self, msg):
+           write_q.put(msg)

+   try:
        # 让调用者获得读取流和写入流对象
+       yield ReadStream(), WriteStream()
+   finally:
        # 1. 通知写入线程停止
+       write_q.put(None)
        # 2. 等待写线程结束(给它时间处理完队列里的消息并退出)
+       wt.join(timeout=3)
+       try:
            # 关闭子进程的stdin管道
+           proc.stdin.close()
+       except Exception:
            # 如果关闭时出现异常则跳过
+           pass
        # 等待子进程退出,最多2秒
+       proc.wait(timeout=2)
        # 如果进程还未退出,强制杀死进程并等待其退出
+       if proc.poll() is None:
+           proc.kill()
+           proc.wait()

3.4 工作流程 #

stdio_client实现了一个 通过标准输入/输出(stdio)与 MCP 子进程通信 的客户端。它启动一个子进程,用 stdin/stdout 收发 JSON-RPC 消息,并通过两个线程分别负责读和写。

sequenceDiagram participant 调用方 participant stdio_client participant write_q as write_q participant 写线程 participant proc as proc.stdin participant proc_out as proc.stdout participant 读线程 participant read_q as read_q Note over 调用方,read_q: 写入路径 调用方->>stdio_client: WriteStream.send(msg) stdio_client->>write_q: 入队 write_q->>写线程: 出队 写线程->>proc: 写入 Note over 调用方,read_q: 读取路径 proc_out->>读线程: 读取 读线程->>read_q: 入队 read_q->>stdio_client: 出队 stdio_client-->>调用方: ReadStream.get()

3.4.1. 数据模型:StdioServerParameters #

# 定义 StdioServerParameters 类,继承自 BaseModel
class StdioServerParameters(BaseModel):
    # 指定要执行的命令(字符串类型)
    command: str
    # 指定命令行参数,默认为空列表
    args: list = Field(default_factory=list)
    # 指定额外的环境变量,可以为 None,类型为字典或 None
    env: dict | None = None
    # 指定工作目录,可以为 str、Path 或 None
    cwd: str | Path | None = None
    # 指定编码方式,默认为 utf-8
    encoding: str = "utf-8"

用 Pydantic 定义启动子进程所需的参数:

  • command:要执行的命令(如 "python")
  • args:命令行参数(如 ["1.mcp.py", "serve"])
  • env:额外环境变量,会与默认环境合并
  • cwd:工作目录
  • encoding:stdin/stdout 的编码,默认 utf-8

3.4.2. 环境变量:_env() 和 _ENV_VARS #

# 根据平台(Windows或非Windows)定义要继承的环境变量名字列表
_ENV_VARS = (
    # 如果是Windows系统,则使用以下环境变量名
    ["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
     "PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
    # 否则(非Windows,如Linux/Mac),使用下面的环境变量名
    if sys.platform == "win32"
    else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)

# 定义 _env() 函数,返回过滤后的环境变量字典
def _env():
    # 遍历_ENV_VARS,只收集值非空且不以"()"开头的变量
    return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}
  • _ENV_VARS:按平台选择要继承的环境变量名
  • _env():从当前进程环境里取出这些变量,过滤掉值为空或 "()" 开头的,返回一个字典
  • 通常,以 "()" 开头的环境变量值来源于 Shell 函数定义。

这样子进程会继承精简后的环境,避免把当前进程的过多环境带进去。

3.4.3 核心:stdio_client 上下文管理器 #

# 装饰器,声明这是一个上下文管理器
@contextmanager
# 定义stdio_client函数,接收服务参数和可选的错误日志输出
def stdio_client(server, errlog=sys.stderr):

stdio_client 是一个上下文管理器,用于:

  1. 启动子进程
  2. 创建读写线程
  3. 向调用方提供 ReadStream 和 WriteStream
  4. 退出时关闭管道并结束子进程

3.4.4 启动子进程 #

# 如果在Windows平台且server.command为"python",则用当前python解释器,否则用指定命令
cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
# 合并默认环境变量和server.env(后者可以覆盖前者)
env = {**_env(), **(server.env or {})}
# 创建子进程,指定stdin、stdout为管道,stderr为errlog,环境变量env,工作目录cwd,二进制模式
proc = subprocess.Popen(
    [cmd] + server.args,
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,
    env=env, cwd=server.cwd, text=False,
)
  • 命令:在 Windows 且 command == "python" 时,用 sys.executable 保证用当前 Python;否则用 server.command
  • 环境:_env() 与 server.env 合并,后者覆盖前者
  • 管道:stdin、stdout 用管道,stderr 输出到 errlog(默认 sys.stderr)
  • text=False:以二进制模式读写,便于控制编码

3.4.5 读线程 read_stdout #

# 定义读取子进程 stdout 的函数
def read_stdout():
    try:
        # 持续循环,保持读取
        while True:
            # 逐行读取子进程的标准输出
            line = proc.stdout.readline()
            # 如果读到空字节(EOF),跳出循环
            if not line:
                break
            # 按指定编码解码,并去除行首尾空白字符
            line = line.decode(server.encoding).strip()
            # 如果这一行不是空字符串
            if line:
                try:
                    # 尝试将这一行解析为 JSON-RPC 消息对象,封装成 SessionMessage 后放入读取队列
                    read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
                except Exception:
                    # 如果解析失败,忽略本行
                    pass
    finally:
        # 循环结束时,向读队列放入 None,作为读取结束信号
        read_q.put(None)

流程:

  1. 用 readline() 按行读子进程 stdout
  2. 读到空字节表示 EOF,退出循环
  3. 按 server.encoding 解码并 strip
  4. 非空行用 jsonrpc_message_adapter.validate_json() 解析为 JSON-RPC 消息
  5. 解析成功则封装为 SessionMessage 放入 read_q
  6. 解析失败则忽略
  7. 结束时往 read_q 放入 None,作为“读取结束”信号

3.4.6 写线程 write_stdin #

# 定义写入子进程stdin的线程函数
def write_stdin():
    try:
        # 不断从写队列中取出消息对象(阻塞等待)
        while (m := write_q.get()) is not None:
            # 将消息对象序列化为JSON字符串,结尾加换行符,并按指定编码转换为字节串
            s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
            # 将编码后的字节串写入子进程的stdin
            proc.stdin.write(s)
            # 立即刷新buffer,确保数据及时发送到子进程
            proc.stdin.flush()
    # 捕获管道断开或连接重置(子进程可能提前退出),静默忽略异常直接退出线程
    except (BrokenPipeError, ConnectionResetError):
        pass

流程:

  1. 从 write_q 阻塞取消息,取到 None 表示结束
  2. 将 m.message 序列化为 JSON 字符串,末尾加换行,再按 server.encoding 编码
  3. 写入 proc.stdin 并 flush
  4. 遇到 BrokenPipeError 或 ConnectionResetError 时静默退出(子进程可能已关闭)

3.4.7 ReadStream 和 WriteStream #

# 定义读取流类
class ReadStream:
    # 从读取队列获取一条消息(阻塞),如果队列为空会等待
    def get(self):
        return read_q.get()

# 定义写入流类
class WriteStream:
    # 将消息放入写入队列,由写线程发送到子进程
    def send(self, msg):
        write_q.put(msg)
  • ReadStream.get():从 read_q 阻塞取一条消息(或 None)
  • WriteStream.send(msg):把消息放入 write_q,由写线程发送

调用方只需使用这两个接口,不必直接操作队列和线程。

3.4.8 退出时的清理 #

# 尝试执行主逻辑,提供读写流给调用方
try:
    yield ReadStream(), WriteStream()
# 无论主逻辑如何结束,最终都要执行资源清理
finally:
    # 通知写线程停止(放入哨兵 None)
    write_q.put(None)
    try:
        # 关闭子进程的标准输入,促使子进程感知 EOF
        proc.stdin.close()
    except Exception:
        # 忽略关闭时发生的任何异常,保证流程继续
        pass
    # 等待子进程在2秒内正常退出
    proc.wait(timeout=2)
    # 如果2秒后进程还未退出,则强制杀掉
    if proc.poll() is None:
        proc.kill()
        proc.wait()
  1. write_q.put(None):通知写线程停止
  2. proc.stdin.close():关闭 stdin,子进程读到 EOF 后应退出
  3. proc.wait(timeout=2):最多等 2 秒
  4. 若仍未退出,proc.kill() 强制结束,再 wait() 回收

3.4.9 数据流示意 #

调用方                     stdio_client
  │                              │
  │  WriteStream.send(msg)       │
  │ ─────────────────────────►  write_q  ──► 写线程 ──► proc.stdin
  │                              │
  │  ReadStream.get()            │
  │ ◄─────────────────────────  read_q   ◄── 读线程 ◄── proc.stdout
  │                              │

3.4.10 设计要点 #

点 说明
双线程 读、写分离,避免阻塞;读线程负责解析 JSON-RPC
队列 Queue 线程安全,主逻辑与 I/O 解耦
守护线程 读写线程设为 daemon,主进程退出时自动结束
哨兵 None read_q 和 write_q 用 None 表示结束
超时与强制结束 2 秒内未退出则 kill,防止子进程卡死

3.4.11 典型用法 #

# 使用stdio_client上下文管理器启动子进程,并获取读写流对象
with stdio_client(StdioServerParameters(command="python", args=["1.mcp.py", "serve"])) as (read_stream, write_stream):
    # 通过写流发送请求消息
    write_stream.send(some_request_message)
    # 通过读流获取响应消息
    response = read_stream.get()

实现了一个基于 stdio 的 MCP 客户端,通过子进程的 stdin/stdout 收发 JSON-RPC 消息,并用队列和线程完成异步读写。

4. 初始化 #

介绍初始化过程的完整细节,包括协议初始化的步骤、消息体设计、会话流程以及相关安全性要点。MCP 客户端与服务端约定,启动后需首先完成初始化(Initialize),以便双方协商协议版本与能力,确认后才能进行后续交互。这一阶段尤为重要,可以避免后续通信因能力不兼容、协议冲突等导致的异常。

初始化通常遵循如下流程:

  1. 客户端发起 Initialize 请求
    客户端构造一个 InitializeRequest,其中包含自身支持的协议版本、功能(capabilities)、实现信息(如产品名、版本等)等。该请求通过 JSON-RPC 格式发送给服务端。

  2. 服务端校验并回复
    服务端收到请求后,校验客户端协议版本是否支持,与自身能力进行对照。若无误,返回 InitializeResult,其中说明实际使用的协议版本、服务端特性与约束。如果发生错误,则以 JSON-RPC 错误格式告知。

  3. 客户端收到回复,若正常则发送 Initialized 通知
    客户端收到确认后,再发送 InitializedNotification,以通知服务端初始化阶段正式结束,接下来能正常发送其它业务请求。

  4. 异常处理与兼容协议
    若任何一方发现版本不兼容,或缺少必要能力,应及时终止会话并清晰反馈错误。

初始化过程示例消息(伪代码):

// InitializeRequest
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "0.1.0",
    "implementation": { "name": "mcp_lite", "version": "0.1.0" },
    "capabilities": { "uri": true, ... }
  }
}

// InitializeResult
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "0.1.0",
    "capabilities": { "uri": true, ... }
  }
}

// InitializedNotification
{
  "jsonrpc": "2.0",
  "method": "initialized",
  "params": {}
}

要点:

  • 初始化请求和响应参数详见 types.py 的 InitializeRequestParams、InitializeResult 等结构体定义。
  • 只有初始化完成,双方才能安全进行后续通信。
  • 若初始化阶段出现异常(如版本不兼容),应明确抛出 MCPError,使调用者及时响应。

4.1. session.py #

mcp_lite/client/session.py

import sys
# 导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入相关类型和常量
from mcp_lite.types import (
      InitializeRequestParams,    # 导入初始化请求参数结构体
      InitializeRequest,          # 导入初始化请求结构体
      LATEST_PROTOCOL_VERSION,    # 导入协议最新版本号
      ClientCapabilities,         # 导入客户端能力描述
      Implementation,             # 导入实现信息结构体
      InitializedNotification,    # 导入初始化完成通知
      InitializeResult,           # 导入初始化请求返回结构体
      JSONRPCRequest,             # 导入 JSONRPC 请求类型
      JSONRPCResponse,            # 导入 JSONRPC 响应类型
      JSONRPCError,               # 导入 JSONRPC 错误类型
      JSONRPCNotification,        # 导入 JSONRPC 通知类型
)
# 定义 MCPError 异常类,用于表示协议相关错误
class MCPError(Exception):
    # 初始化方法,接收错误码、错误信息和可选的数据
    def __init__(self, code, message, data=None):
        # 将错误码、错误信息、附加数据赋值为实例属性
        self.code, self.message, self.data = code, message, data
        # 调用父类 Exception 的初始化,只传递错误信息
        super().__init__(message)

    # 类方法,利用 JSONRPCError 对象创建 MCPError 实例
    @classmethod
    def from_jsonrpc(cls, err):
        # 从 JSONRPCError.error 对象中取出 code/message/data 创建 MCPError
        return cls(err.error.code, err.error.message, err.error.data)
# 定义 ClientSession 类,用于描述客户端会话
class ClientSession:
    # 构造方法,接收读取流和写入流
    def __init__(self, read_stream, write_stream):
        # 赋值读取流和写入流
        self._read, self._write = read_stream, write_stream
        # 初始化请求 id 计数器为 0
        self._req_id = 0
    # 内部方法,发送请求并同步等待响应
    def _request(self, req):
        # 当前请求 id
        rid = self._req_id
        # 自增请求 id,确保下次唯一
        self._req_id += 1
        # 对请求对象进行序列化,转为 dict
        d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
        # 构建 JSONRPCRequest,再封装入 SessionMessage,通过写入流发送
        jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
        # 打印请求信息到标准错误流
        print("[Client] Request:", jreq.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
        # 将请求对象封装成 SessionMessage 后发送
        self._write.send(SessionMessage(message=jreq))
        # 循环等待响应
        while True:
            # 从读取流取出一条消息
            msg = self._read.get()
            # 若取到 None,说明连接断开,抛出异常
            if msg is None:
                raise MCPError(-32000, "Connection closed")
            # 若消息类型不是 SessionMessage,忽略继续等
            if not isinstance(msg, SessionMessage):
                continue
            # 取消息内容
            m = msg.message
            # 若为 Response 或 Error,且 id 匹配
            if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
                print("[Client] Response:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
                # 若为错误,抛出 MCPError 异常
                if isinstance(m, JSONRPCError):
                    raise MCPError.from_jsonrpc(m)
                # 为正常响应则返回结果数据
                return m.result
    # 内部方法,发送通知消息
    def _notify(self, n):
        # 通知对象序列化为 dict
        d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
        # 封装为 JSONRPCNotification,再封装成 SessionMessage 后发出
        self._write.send(SessionMessage(
            message=JSONRPCNotification(
                jsonrpc="2.0",
                method=d["method"],
                params=d.get("params"),
            )
        ))            
    # 初始化过程,发送初始化请求,收到响应后再发初始化完成通知
    def initialize(self):
        # 构造初始化请求,并发送等待返回
        r = self._request(InitializeRequest(
            params=InitializeRequestParams(
                protocol_version=LATEST_PROTOCOL_VERSION,         # 协议版本号
                capabilities=ClientCapabilities(),                # 客户端能力
                client_info=Implementation(name="mcp_lite", version="0.1.0"),  # 客户端信息
            )
        ))
        # 发送"初始化完成"通知
        self._notify(InitializedNotification())
        # 用 InitializeResult 验证响应并返回
        return InitializeResult.model_validate(r, by_name=False)

4.2. fastmcp.py #

mcp_lite/server/fastmcp.py

# 导入 sys 模块,用于标准输入输出操作
import sys
# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.server 模块导入 stdio 子模块
from mcp_lite.server import stdio
# 从 mcp_lite.types 模块导入所有相关类型和类
from mcp_lite.types import (
     JSONRPCRequest,            # JSONRPC 请求类型
     JSONRPCError,              # JSONRPC 错误类型
     ErrorData,                 # 错误数据类型
     InitializeResult,          # 初始化响应类型
     LATEST_PROTOCOL_VERSION,   # 最新协议版本常量
     ToolsCapability,           # 工具能力类型
     ServerCapabilities,        # 服务器功能特性类型
     Implementation,            # 实现信息类型
     JSONRPCResponse,           # JSONRPC 响应类型
)
# 主服务类 FastMCP
class FastMCP:
    # 初始化方法,支持自定义服务器名称
    def __init__(self, name="mcp-server"):
        # 保存服务器名称
        self.name = name
     # 实际 JSONRPC 方法分发与业务处理
    def _handle(self, req):
        # 拆分 method、params、id
        method, params, rid = req.method, req.params or {}, req.id

        # 处理初始化请求
        if method == "initialize":
            # 组装初始化响应,包括协议版本、能力与服务信息
            r = InitializeResult(
                protocol_version=LATEST_PROTOCOL_VERSION,
                capabilities=ServerCapabilities(tools=ToolsCapability()),
                server_info=Implementation(name=self.name, version="0.1.0"),
            )
            # 返回 JSONRPCResponse 包装的结果
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_none=True))
        # 不支持的方法,返回 "方法未找到" 错误(code -32601)
        return JSONRPCError(jsonrpc="2.0", id=rid, error=ErrorData(code=-32601, message=f"Method not found: {method}"))    
    # 处理 SessionMessage 消息,主要路由 JSONRPCRequest
    def _handle_msg(self, msg):
        # 判断消息类型不是 SessionMessage 则忽略
        if not isinstance(msg, SessionMessage):
            return None
        # 获取消息体
        m = msg.message
        # 只处理 JSONRPCRequest 类型
        if not isinstance(m, JSONRPCRequest):
            return None
        print("[Server] Request:", m.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)    
        try:
            # 调用实际业务处理方法
            resp = self._handle(m)
            print("[Server] Response:", resp.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return resp
        except Exception as e:
            # 捕获异常,返回标准 JSONRPCError(code -32603,内部错误)
            err = JSONRPCError(jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e)))
            print("[Server] Response (error):", err.model_dump_json(by_alias=True, exclude_unset=True), file=sys.stderr)
            return err
    # 运行服务方法,默认采用 stdio 传输方式
    def run(self, transport="stdio"):
        # 仅支持 stdio,其他方式抛出异常
        if transport != "stdio":
            raise ValueError(f"unsupported transport: {transport}")
        # 启动 stdio server,当前对象 _handle_msg 作为消息处理回调
        stdio.stdio_server(self._handle_msg)    

4.3. stdio.py #

mcp_lite/server/stdio.py

# 导入 sys 模块,用于标准输入输出操作
import sys

# 从 mcp_lite.message 模块导入 SessionMessage 类
from mcp_lite.message import SessionMessage
# 从 mcp_lite.types 模块导入 jsonrpc_message_adapter 适配器
from mcp_lite.types import jsonrpc_message_adapter

# 定义 stdio_server 函数,参数 handler 是处理消息的回调函数
def stdio_server(handler):
    # 进入一个无限循环,持续处理标准输入的数据
    while True:
        # 从标准输入读取一行
        line = sys.stdin.readline()
        # 如果读取到空字符串,说明 EOF,跳出循环
        if not line:
            break
        # 去除读取行首尾的空白字符
        line = line.strip()
        # 如果去除空白后字符串仍为空,继续读取下一行
        if not line:
            continue
        try:
            # 使用 jsonrpc_message_adapter 对输入字符串进行解析为消息对象
            msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
            # 使用解析后的消息包装成 SessionMessage 再传递给 handler 处理,获取响应
            response = handler(SessionMessage(message=msg))
            # 判断 handler 是否返回了响应结果
            if response is not None:
                # 如果响应对象有 message 属性,则直接使用;否则把响应包装成 SessionMessage
                r = response if hasattr(response, "message") else SessionMessage(message=response)
                # 将响应的 message 属性序列化为 JSON 字符串,写入标准输出,并换行
                sys.stdout.write(r.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n")
                # 立即刷新标准输出缓冲区,确保数据被及时输出
                sys.stdout.flush()
        except Exception:
            print("[Server] Error:", sys.exc_info(), file=sys.stderr)
            # 捕获所有异常,忽略错误继续处理下一行
            pass

4.4. 1.mcp.py #

1.mcp.py

# 导入 sys 模块
import sys
# 导入 mcp_lite 模块
+from mcp_lite import ClientSession,StdioServerParameters
# 导入 mcp_lite.client.stdio 模块
from mcp_lite.client.stdio import stdio_client
# 导入 mcp_lite.server.fastmcp 模块
+from mcp_lite.server.fastmcp import FastMCP
# 创建一个名为 "Hello-MCP" 的 FastMCP 服务器对象
+mcp = FastMCP(name="Hello-MCP")
# 定义客户端启动函数
def run_client():
    # 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
     # 使用上下文管理器启动 stdio_client,返回读写流
    with stdio_client(server_params) as (read, write):
        # 创建客户端会话对象,绑定读写流
+       session = ClientSession(read, write)
         # 初始化会话,完成握手
+       session.initialize()

# 定义以 stdio 运行的服务器端函数
+def run_server_stdio():
    # 在标准错误流打印启动提示
+   print("启动 MCP 服务器(stdio 模式)...", file=sys.stderr)
    # 以 stdio 方式运行 mcp 服务器
+   mcp.run(transport="stdio")

# 主入口函数,根据命令行参数决定运行模式
def main():
    # 如果参数含有"serve",以服务器模式运行
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
+       run_server_stdio()
    else:
        # 否则启动客户端测试流程,自动拉起服务端
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        run_client()

# 判断是否直接运行此文件
if __name__ == "__main__":
    main()

官方代码

# 导入 sys 模块
import sys
#from mcp_lite import ClientSession,StdioServerParameters
from mcp import ClientSession,StdioServerParameters
#from mcp_lite.client.stdio import stdio_client
from mcp.client.stdio import stdio_client
#from mcp_lite.server.fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP
# 导入 asyncio 模块,用于异步编程
import asyncio
mcp = FastMCP(name="Hello-MCP")
# 定义客户端启动函数
async def run_client():
    # 构造 StdioServerParameters,用 python 启动本文件并传入"serve"参数
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
    async with stdio_client(server_params) as (read, write):
          # 创建客户端会话对象,绑定读写流
       session = ClientSession(read, write)
         # 初始化会话,完成握手
       await session.initialize()
# 定义以 stdio 运行的服务器端函数
def run_server_stdio():
    # 在标准错误流打印启动提示
   print("启动 MCP 服务器(stdio 模式)...", file=sys.stderr)
    # 以 stdio 方式运行 mcp 服务器
   mcp.run(transport="stdio")
# 主入口函数,根据命令行参数决定运行模式
def main():
    # 如果参数含有"serve",以服务器模式运行
    if len(sys.argv) >= 2 and sys.argv[1] == "serve":
      run_server_stdio()
    else:
        # 否则启动客户端测试流程,自动拉起服务端
        print("启动 MCP 客户端测试...")
        print("将自动启动服务器进程并测试连接")
        asyncio.run(run_client())

# 判断是否直接运行此文件
if __name__ == "__main__":
    main()

4.5. init.py #

mcp_lite/init.py

from mcp_lite.client.stdio import StdioServerParameters
# 从 mcp_lite.client.session 导入 ClientSession 类
+from mcp_lite.client.session import ClientSession
# 设置当前模块的对外可见成员列表
+__all__ = ["StdioServerParameters", "ClientSession"]

4.6. stdio.py #

mcp_lite/client/stdio.py


# 从pydantic模块导入BaseModel和Field,用于数据模型验证
from pydantic import BaseModel, Field
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 导入subprocess模块,用于创建和管理子进程
import subprocess
# 导入os模块,用于与操作系统交互
import os
# 导入sys模块,访问与Python解释器相关的变量和函数
import sys
# 导入threading模块,支持多线程
import threading
# 从contextlib中导入contextmanager装饰器,用于实现上下文管理器
from contextlib import contextmanager
# 从pathlib模块导入Path类,用于路径操作
from pathlib import Path
# 从queue模块导入Queue类,实现线程安全的队列
from queue import Queue
# 从mcp_lite.message模块导入SessionMessage类
from mcp_lite.message import SessionMessage
# 从mcp_lite.types模块导入jsonrpc_message_adapter
from mcp_lite.types import jsonrpc_message_adapter
# 定义StdioServerParameters类,描述启动子进程时的参数
class StdioServerParameters(BaseModel):
    # 要执行的命令(字符串类型)
    command: str
    # 命令行参数,默认为空列表
    args: list = Field(default_factory=list)
    # 额外的环境变量,可以为None
    env: dict | None = None
    # 工作目录,可以为str、Path或None
    cwd: str | Path | None = None
    # 字符编码,默认为utf-8
    encoding: str = "utf-8"

# 定义环境变量名列表,根据平台不同选择不同的变量集合
_ENV_VARS = (
    # 如果是在Windows平台,使用以下环境变量名
    ["APPDATA", "HOMEDRIVE", "HOMEPATH", "LOCALAPPDATA", "PATH", "PATHEXT",
     "PROCESSOR_ARCHITECTURE", "SYSTEMDRIVE", "SYSTEMROOT", "TEMP", "USERNAME", "USERPROFILE"]
    # 否则(非Windows),使用以下环境变量名
    if sys.platform == "win32"
    else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)

# 获取环境变量,并排除值以"()"开头的变量
def _env():
    # 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
    return {k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")}

# 定义stdio_client上下文管理器,实现通过stdio与子进程通信
@contextmanager
def stdio_client(server, errlog=sys.stderr):
    # 创建线程安全的读取队列
    read_q = Queue()
    # 创建线程安全的写入队列
    write_q = Queue()
    # 如果在Windows上且server.command为"python",则用当前Python解释器,否则用指定命令
    cmd = sys.executable if sys.platform == "win32" and server.command == "python" else server.command
    # 合并环境变量,自定义env覆盖默认环境
    env = {**_env(), **(server.env or {})}

    # 启动子进程,并重定向其stdin、stdout和stderr
    proc = subprocess.Popen(
        [cmd] + server.args,#cmd是python解释器,server.args是当前脚本文件名和默认参数 "serve"
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=errlog,#stdin是标准输入,stdout是标准输出,stderr是标准错误
        env=env, cwd=server.cwd, text=False,#env是环境变量,cwd是工作目录,text=False是二进制模式
    )#启动子进程,并重定向其stdin、stdout和stderr

    # 定义从子进程stdout读取数据的线程函数
    def read_stdout():
        try:
            while True:
                # 持续逐行读取子进程的标准输出
                line = proc.stdout.readline()
                # 如果读到空行说明输出结束,退出循环
                if not line:
                    break
                # 按指定编码解码,并去除首尾空白字符
                line = line.decode(server.encoding).strip()
                # 只处理非空行
                if line:
                    try:
                        # 将JSON RPC消息解析为SessionMessage对象并放入读取队列
                        read_q.put(SessionMessage(message=jsonrpc_message_adapter.validate_json(line, by_name=False)))
                    except Exception:
                        # 解析异常时忽略该消息
                        pass
        finally:
            # 读取结束在队列插入None,通知主线程
            read_q.put(None)

    # 定义向子进程stdin写入数据的线程函数
    def write_stdin():
        try:
            # 不断尝试取出待写入的消息(阻塞)
            while (m := write_q.get()) is not None:
                # 将消息对象序列化为json字符串,追加换行并编码,写入stdin
                s = (m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode(server.encoding)
                proc.stdin.write(s)
                proc.stdin.flush()
        except (BrokenPipeError, ConnectionResetError):
            # 若出现BrokenPipe或ConnectionReset,直接退出写线程
            pass

    # 创建并启动读取线程,设置为守护线程
    rt = threading.Thread(target=read_stdout, daemon=True)
    # 创建并启动写入线程,设置为守护线程
    wt = threading.Thread(target=write_stdin, daemon=True)
    rt.start()
    wt.start()

    # 定义读取流的类
    class ReadStream:
        # 从读取队列获取数据(阻塞),返回一条消息
        def get(self):
            return read_q.get()

    # 定义写入流的类
    class WriteStream:
        # 将消息放入写入队列,等待写线程处理
        def send(self, msg):
            write_q.put(msg)

    try:
        # 让调用者获得读取流和写入流对象
        yield ReadStream(), WriteStream()
    finally:
        # 1. 通知写线程停止
        write_q.put(None)
        # 2. 等待写线程结束(给它时间处理完队列里的消息并退出)
+       wt.join(timeout=3)
        # 3. 再关闭 stdin
        try:
            proc.stdin.close()
        except Exception:
            pass
        # 等待子进程退出,最多2秒
        proc.wait(timeout=2)
        # 如果进程还未退出,强制杀死进程并等待其退出
        if proc.poll() is None:
            proc.kill()
            proc.wait()

4.7. types.py #

mcp_lite/types.py


# 导入 Any 和 Literal 类型注解
from typing import Any, Literal
# 从 pydantic 导入 BaseModel、ConfigDict、Field、TypeAdapter
+from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
# 导入 pydantic 的 to_camel 驼峰命名生成器
+from pydantic.alias_generators import to_camel
# 定义 RequestId 类型,既可以是 int 也可以是 str
RequestId = int | str
# 定义 MCP 的基础模型类,支持驼峰命名和按名称填充
+class MCPModel(BaseModel):
    # 指定 Pydantic 模型配置:使用驼峰形式命名和按名称填充
+   model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
# 定义客户端能力结构体
+class ClientCapabilities(MCPModel):
    # 可选字段:客户端实验性能力扩展,键为 str,值为字典
+   experimental: dict[str, dict[str, Any]] | None = None
# 定义服务器或客户端的实现信息结构体
+class Implementation(MCPModel):
    # 实现的名称
+   name: str = ""
    # 实现的版本号
+   version: str = ""
    # 可选字段:人类可读的标题
+   title: str | None = None
    # 可选字段:实现的描述
+   description: str | None = None    
# 定义初始化请求参数结构体
+class InitializeRequestParams(MCPModel):
    # 协议版本号
+   protocol_version: str = ""
    # 可选字段:客户端能力描述
+   capabilities: ClientCapabilities = None
    # 可选字段:客户端实现信息
+   client_info: Implementation = None
# 定义初始化请求结构体
+class InitializeRequest(MCPModel):
    # 方法名,固定为 "initialize"
+   method: Literal["initialize"] = "initialize"
    # 可选字段:参数,类型为 InitializeRequestParams
+   params: InitializeRequestParams = None    
# 当前协议的最新版本号
+LATEST_PROTOCOL_VERSION = "2024-11-05"
# 定义工具相关能力结构体
+class ToolsCapability(MCPModel):
    # 工具列表是否发生变化,可选布尔类型
+   list_changed: bool | None = None
# 定义服务端能力描述结构体
+class ServerCapabilities(MCPModel):
    # 可选字段:实验性能力扩展
+   experimental: dict[str, dict[str, Any]] | None = None
    # 可选字段:工具能力
+   tools: ToolsCapability | None = None
# 定义初始化响应结构体
+class InitializeResult(MCPModel):
    # 协议版本号
+   protocol_version: str = ""
    # 可选字段:服务端能力描述
+   capabilities: ServerCapabilities = None
    # 可选字段:服务端实现信息
+   server_info: Implementation = None
    # 可选字段:初始化说明信息
+   instructions: str | None = None
# 定义客户端初始化完成通知结构体
+class InitializedNotification(MCPModel):
    # 方法名,固定为 "notifications/initialized"
+   method: Literal["notifications/initialized"] = "notifications/initialized"
    # 可选字段:通知参数,可以为字典或 None
+   params: dict[str, Any] | None = None    
# 定义 JSONRPC 请求的数据结构
class JSONRPCRequest(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求 ID,可以为 int 或 str 类型
    id: RequestId = None
    # 方法名称,字符串类型
    method: str = ""
    # 方法参数,为一个字典或 None
    params: dict[str, Any] | None = None    

# 定义 JSONRPC 响应的数据结构
class JSONRPCResponse(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 响应的 ID,需要与请求 ID 匹配
    id: RequestId = None
    # 响应结果,可以为字典或 None
    result: dict[str, Any] = None   
# 定义错误的数据结构
class ErrorData(BaseModel):
    # 错误码,默认值为 0
    code: int = 0
    # 错误信息,默认值为空字符串
    message: str = ""
    # 附加的错误数据,可以为任意类型或 None
    data: Any = None     
# 定义 JSONRPC 错误消息的数据结构
class JSONRPCError(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
    jsonrpc: Literal["2.0"] = "2.0"
    # 错误对应的请求 ID,可以为 None
    id: RequestId | None = None
    # 错误的详细信息,类型为 ErrorData
    error: ErrorData = None    
# 定义 JSONRPC 通知的数据结构(没有 id 字段)
+class JSONRPCNotification(BaseModel):
    # jsonrpc 协议版本,固定为 "2.0"
+   jsonrpc: Literal["2.0"] = "2.0"
    # 通知的方法名称
+   method: str = ""
    # 通知参数,可以为字典或者 None
+   params: dict[str, Any] | None = None    

# 定义所有 JSONRPC 消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError

# 定义 JSONRPC 消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)

4.8 工作流程 #

sequenceDiagram participant 客户端 participant 服务端 客户端->>服务端: InitializeRequest Note right of 服务端: _handle("initialize") 服务端-->>客户端: InitializeResult 客户端->>服务端: InitializedNotification Note right of 服务端: (无响应) Note over 客户端: initialize() 返回 Note over 客户端: 退出 with,清理 stdio_client

4.8.1 整体流程 #

初始化是 MCP 客户端和服务端建立会话前的握手:

  1. 客户端发 InitializeRequest
  2. 服务端返回 InitializeResult
  3. 客户端发 InitializedNotification
  4. 之后才能正常收发业务请求

4.8.2 session.py客户端会话 #

# 定义客户端会话类
class ClientSession:
    # 构造函数,接收读写流
    def __init__(self, read_stream, write_stream):
        # 保存读流和写流对象
        self._read, self._write = read_stream, write_stream
        # 初始化请求ID计数器,从0开始
        self._req_id = 0
  • _read / _write:读写流,由 stdio_client 提供
  • _req_id:请求 ID 计数器,用于 JSON-RPC 的 id 字段

4.8.3 _request:发请求并等待响应 #

# 定义 _request 方法,用于发送请求并等待响应
def _request(self, req):
    # 使用当前请求ID
    rid = self._req_id
    # 请求ID递增,为下次请求准备
    self._req_id += 1
    # 将请求对象序列化为字典,采用别名、JSON模式并排除None字段
    d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
    # 构造 JSONRPCRequest 对象
    jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=d["method"], params=d.get("params"))
    # 将请求通过写流发出
    self._write.send(SessionMessage(message=jreq))
    # 不断轮询等待响应消息
    while True:
        # 从读流获取一条消息
        msg = self._read.get()
        # 如果收到None,说明连接被关闭,抛出异常
        if msg is None:
            raise MCPError(-32000, "Connection closed")
        # 只处理 SessionMessage 类型的消息,其它跳过
        if not isinstance(msg, SessionMessage):
            continue
        # 取出实际消息内容
        m = msg.message
        # 如果收到响应或错误,且id匹配,则进入处理
        if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
            # 如果是错误类型,抛出相应的异常
            if isinstance(m, JSONRPCError):
                raise MCPError.from_jsonrpc(m)
            # 如果是正常响应,返回其result字段
            return m.result
  • 把请求转成 JSONRPCRequest,通过 _write.send() 发送
  • 循环从 _read.get() 取消息,直到收到 id == rid 的 Response 或 Error
  • 收到 Error 时抛出 MCPError,否则返回 m.result

4.8.4 _notify:发通知(无响应) #

# 定义 _notify 方法,用于发送通知(不期待响应)
def _notify(self, n):
    # 将通知对象序列化为字典,采用别名、JSON模式并排除None字段
    d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
    # 构造 JSONRPCNotification 消息并通过写流发送出去
    self._write.send(SessionMessage(message=JSONRPCNotification(
        jsonrpc="2.0",
        method=d["method"],
        params=d.get("params")
    )))
  • 通知没有 id,服务端不返回 JSON-RPC 响应

4.8.5 initialize:初始化握手 #

# 定义 initialize 方法,用于执行客户端与服务端的初始化握手流程
def initialize(self):
    # 发送 InitializeRequest 初始化请求,并等待响应,结果保存在 r 中
    r = self._request(InitializeRequest(...))
    # 发送 InitializedNotification 通知,告知服务端已初始化
    self._notify(InitializedNotification())
    # 使用 InitializeResult 对收到的响应结果进行校验和转换,并返回
    return InitializeResult.model_validate(r, by_name=False)
  • 先发 InitializeRequest 并拿到 InitializeResult
  • 再发 InitializedNotification
  • 最后用 InitializeResult.model_validate 校验并返回

4.8.6 fastmcp.py服务端 #

# 定义 FastMCP 类,作为 MCP 服务端核心逻辑
class FastMCP:
    # _handle 方法处理传入的请求 req
    def _handle(self, req):
        # 从请求中获取方法名、参数字典和请求ID
        method, params, rid = req.method, req.params or {}, req.id
        # 如果方法名为 "initialize"
        if method == "initialize":
            # 构造 InitializeResult 响应对象
            r = InitializeResult(...)
            # 返回 JSONRPCResponse,包含对应的请求ID和序列化后的结果
            return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(...))
        # 如果方法未找到,返回 JSONRPCError 错误对象
        return JSONRPCError(...)  # 方法未找到
  • 只处理 initialize 方法,返回 InitializeResult
  • 其他方法返回 JSON-RPC 错误
# 定义 run 方法,默认使用 "stdio" 作为传输方式
def run(self, transport="stdio"):
    # 调用 stdio 的 stdio_server,以当前实例的 _handle_msg 作为处理函数
    stdio.stdio_server(self._handle_msg)
  • 通过 stdio_server 从 stdin 读请求,调用 _handle_msg 处理,把响应写到 stdout

4.8.7 stdio.py(服务端)单进程 stdio 循环 #

# 定义 stdio_server 函数,接收一个消息处理 handler
def stdio_server(handler):
    # 循环不断从标准输入读取消息
    while True:
        # 读取一行输入
        line = sys.stdin.readline()
        # 如果读到空字符串,说明输入结束,跳出循环
        if not line:
            break
        # 去除两端空白字符
        line = line.strip()
        # 如果行为空,则跳过当前循环
        if not line:
            continue
        # 使用 jsonrpc_message_adapter 解析 JSON-RPC 消息
        msg = jsonrpc_message_adapter.validate_json(line, by_name=False)
        # 用 SessionMessage 包装后交给 handler 处理,获得响应
        response = handler(SessionMessage(message=msg))
        # 如果有响应对象
        if response is not None:
            # 如果响应带有 message 属性,则使用,否则用 SessionMessage 包装
            r = response if hasattr(response, "message") else SessionMessage(message=response)
            # 将响应的 message 序列化为 JSON 并写到标准输出,并追加换行符
            sys.stdout.write(r.message.model_dump_json(...) + "\n")
            # 立即刷新缓冲区
            sys.stdout.flush()
  • 按行从 stdin 读 JSON-RPC 消息
  • 用 jsonrpc_message_adapter 解析
  • 交给 handler 处理,得到响应后写回 stdout

4.8.8 1.mcp.py主入口 #

+from mcp_lite import ClientSession, StdioServerParameters
+from mcp_lite.server.fastmcp import FastMCP
+mcp = FastMCP(name="Hello-MCP")
  • 引入 ClientSession、StdioServerParameters、FastMCP,并创建 MCP 服务实例
+       session = ClientSession(read, write)
+       session.initialize()
  • 在 with stdio_client(...) 内创建 ClientSession 并调用 initialize() 完成握手
+def run_server_stdio():
+   mcp.run(transport="stdio")
  • 服务端模式:用 stdio 运行 MCP 服务
+   if len(sys.argv) >= 2 and sys.argv[1] == "serve":
+       run_server_stdio()
  • 通过 serve 参数区分客户端 / 服务端模式

4.8.9 stdio.py(客户端)关键修复 #

    finally:
        write_q.put(None)
+       wt.join(timeout=3)
        try:
            proc.stdin.close()
  • write_q.put(None):通知写线程停止
  • wt.join(timeout=3):等待写线程结束,最多 3 秒
  • 然后再 proc.stdin.close()

原因:initialize() 会发 InitializedNotification,写线程可能还在往 proc.stdin 写。如果主线程先关闭 stdin,写线程会抛出 ValueError: write to closed file。先 wt.join() 再关闭 stdin,可以避免这个竞态。

4.8.10 types.py 类型与适配器 #

4.8.11 MCP 模型基类 #

class MCPModel(BaseModel):
    model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
  • 使用驼峰命名(protocolVersion 等)
  • populate_by_name=True:支持用 snake_case 或 camelCase 填充

4.8.12 初始化相关类型 #

类型 作用
InitializeRequestParams 初始化请求参数(协议版本、能力、客户端信息)
InitializeRequest method="initialize" 的请求
InitializeResult 初始化响应(协议版本、服务端能力、服务端信息)
InitializedNotification method="notifications/initialized" 的通知

4.8.13 JSON-RPC 类型 #

  • JSONRPCRequest:请求(有 id)
  • JSONRPCResponse:成功响应
  • JSONRPCError:错误响应
  • JSONRPCNotification:通知(无 id)

4.8.14 TypeAdapter #

# 定义 JSONRPC 消息的联合类型,可以是请求、通知、响应或错误(类型联合)
JSONRPCMessage = JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError
# 创建 TypeAdapter,用于自动推断和校验 JSONRPC 消息类型
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
  • jsonrpc_message_adapter.validate_json(line):把一行 JSON 解析成上述四种消息之一,并做校验

4.8.15 消息流示意 #

客户端                              服务端
  │                                    │
  │  InitializeRequest                 │
  │ ─────────────────────────────────►│
  │                                    │  _handle("initialize")
  │  InitializeResult                  │
  │ ◄─────────────────────────────────│
  │                                    │
  │  InitializedNotification           │
  │ ─────────────────────────────────►│
  │                                    │  (无响应)
  │                                    │
  │  initialize() 返回                 │
  │  退出 with,清理 stdio_client      │

4.8.16 小结 #

文件 职责
session.py 客户端会话:发请求、收响应、发通知、完成初始化
fastmcp.py 服务端:处理 initialize 等 JSON-RPC 方法
server/stdio.py 服务端 stdio:读 stdin、解析、调用 handler、写 stdout
client/stdio.py 客户端 stdio:启动子进程、读写线程、队列、wt.join 修复
types.py MCP 与 JSON-RPC 的类型定义和 TypeAdapter
1.mcp.py 主入口:按参数选择客户端或服务端模式

核心改动是:在 client/stdio.py 的 finally 中增加 wt.join(timeout=3),在关闭 proc.stdin 前等待写线程结束,避免 ValueError: write to closed file。

4.8.17 请求响应对应 #

请求和响应的对应关系是这样保证的:

4.8.17.1. JSON-RPC 的 id 字段 #

每个请求都带一个 id(int 或 str),响应会原样回填同一个 id,用于一一对应:

# mcp_lite/types.py
class JSONRPCRequest(BaseModel):
    jsonrpc: Literal["2.0"] = "2.0"
    id: RequestId = None   # 请求 ID
    method: str = ""
4.8.17.2. 客户端:请求 ID 生成与匹配 #

mcp_lite 客户端(同步):

# mcp_lite/client/session.py
def _request(self, req):
    rid = self._req_id
    self._req_id += 1  # 自增保证唯一
    jreq = JSONRPCRequest(jsonrpc="2.0", id=rid, method=..., params=...)
    self._write.send(SessionMessage(message=jreq))

    while True:
        msg = self._read.get()
        # 只处理 id 匹配的响应
        if isinstance(m, (JSONRPCResponse, JSONRPCError)) and getattr(m, "id", None) == rid:
            return m.result  # 或抛出错误
4.8.17.3. 服务端:在响应中回填 id #
# mcp_lite/server/fastmcp.py
def _handle(self, req):
    method, params, rid = req.method, req.params or {}, req.id
    # ... 处理逻辑 ...
    return JSONRPCResponse(jsonrpc="2.0", id=rid, result=r.model_dump(...))
4.8.17.4. ID 类型统一 #

python-sdk 会做 ID 类型归一化,保证字符串形式的 ID 也能正确匹配:

def _normalize_request_id(self, response_id: RequestId) -> RequestId:
    if isinstance(response_id, str):
        try:
            return int(response_id)  # 字符串转整数以匹配客户端
        except ValueError:
            ...
    return response_id
4.8.17.5. 总结 #
机制 作用
请求 ID 自增 保证每个请求有唯一标识
响应回填 id 服务端在响应中返回相同 id
按 id 匹配 客户端用 id 把响应路由到对应请求
response_streams 映射 异步场景下用 request_id 映射到对应的 Promise/流
ID 归一化 统一 int / str 表示,避免匹配失败

整体上,这是标准的 JSON-RPC 2.0 请求-响应配对方式:请求带 id,响应带相同 id,客户端据此完成一一对应。

← 上一节 27.授权 下一节 29.工具 →

访问验证

请输入访问令牌

Token不正确,请重新输入