导航菜单

  • 1.什么是MCP
  • 2.MCP架构
  • 3.MCP服务器
  • 4.MCP客户端
  • 5.版本控制
  • 6.连接MCP服务器
  • 7.SDKs
  • 8.Inspector
  • 9.规范
  • 10.架构
  • 11.协议
  • 12.生命周期
  • 13.工具
  • 14.资源
  • 15.提示
  • 16.日志
  • 17.进度
  • 18.传输
  • 19.补全
  • 20.引导
  • 21.采样
  • 22.任务
  • 23.取消
  • 24.Ping
  • 25.根
  • 26.分页
  • 27.授权
  • 28.初始化
  • 29.工具
  • 30.资源
  • 31.结构化输出
  • 32.提示词
  • 33.上下文
  • 34.StreamableHTTP
  • 35.参数补全
  • 36.引导
  • 37.采样
  • 38.LowLevel
  • 39.任务
  • 40.取消
  • 41.ping
  • 42.根
  • 43.分页
  • 44.授权
  • 45.授权
  • Keycloak
  • asyncio
  • contextlib
  • httpx
  • pathlib
  • pydantic
  • queue
  • starlette
  • subprocess
  • threading
  • uvicorn
  • JSON-RPC
  • z
  • 1.1. 1.mcp.py
  • 1.2. session.py
  • 1.3. stdio.py
  • 1.4. fastmcp.py
  • 1.5. types.py

1.1. 1.mcp.py #

11.mcp/1.mcp.py

import sys
from mcp_lite import StdioServerParameters, ClientSession
from mcp_lite.client.stdio import stdio_client
from mcp_lite.server.fastmcp import FastMCP

mcp = FastMCP(name="HelloMCP")


def run_client():
    # python 1.mcp.py serve
    server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
    with stdio_client(server_params) as (read, write):
        # 创建客户端会话对象,绑定读写流
        session = ClientSession(read, write)
        # 初始化对话,完成握手
        result = session.initialize()
        print("result", result)


def run_server():
    print("启动MCP服务器(stdio模式)", file=sys.stderr)
    mcp.run(transport="stdio")


def main():
+   if len(sys.argv) >= 2 and sys.argv[1] == "serve":
        run_server()
    else:
        run_client()


if __name__ == "__main__":
    main()


# TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol

1.2. session.py #

11.mcp/mcp_lite/client/session.py

import sys
from mcp_lite.message import SessionMessage
from mcp_lite.types import (
    LATEST_PROTOCOL_VERSION,
    ClientCapabilities,
    InitializeRequestParams,
    InitializeRequest,
    Implementation,
    JSONRPCRequest,
    JSONRPCResponse,
    JSONRPCError,
    InitializeResult,
+   InitializedNotification,    # 导入初始化完成通知
+   JSONRPCNotification
)


# 定义一个MCPError异常类,用于表示协议的相关错误
class MCPError(Exception):
    def __init__(self, code, message, data=None):
        self.code, self.message, self.data = code, message, data
        super().__init__(message)

    @classmethod
    def from_jsonrpc(cls, err):
        return cls(err.error.code, err.error.message, err.error.data)


class ClientSession:
    def __init__(self, read_stream, write_stream):
        # 读写流
        self._read, self._write = read_stream, write_stream
        # 初始化请求ID,计数器默认为0
        self._req_id = 0

    def _request(self, req):
        # 当前的请求ID
        rid = self._req_id
        # 自增请求ID,保证下次用新ID,下次也是唯一
        self._req_id += 1
        # 对请求对象进行序列化,转为dict字典
        d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
        jreq = JSONRPCRequest(
            jsonrpc="2.0", id=rid, method=d["method"], params=d["params"]
        )
        print(
            "[客户端] 请求:",
            jreq.model_dump_json(by_alias=True, exclude_unset=True),
            file=sys.stderr,
        )
        # 将请求对象封装成SessionMessage发送给服务器
        self._write.send(SessionMessage(message=jreq))
        # 发送消息之后还要在这里等待服务器的响应
        while True:
            # 从读取流中读取一条消息
            msg = self._read.get()
            if msg is None:
                raise MCPError(-32000, "连接关闭")
            if not isinstance(msg, SessionMessage):
                continue
            m = msg.message
            # 如果响应是合法的并且ID匹配,则就是找到正确的响应
            if (
                isinstance(m, (JSONRPCResponse, JSONRPCError))
                and getattr(m, "id", None) == rid
            ):
                print(
                    "[客户端]收到响应:",
                    m.model_dump_json(by_alias=True, exclude_unset=True),
                    file=sys.stderr,
                )
                if isinstance(m, JSONRPCError):
                    raise MCPError.from_jsonrpc(m)
                return m.result
    # 定义 _notify 方法,用于发送通知(不期待响应)
+   def _notify(self, n):
        # 将通知对象序列化为字典,采用别名、JSON模式并排除None字段
+       d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
        # 构造 JSONRPCNotification 消息并通过写流发送出去
+       self._write.send(SessionMessage(message=JSONRPCNotification(
+           jsonrpc="2.0",
+           method=d["method"],
+           params=d.get("params")
+       )))
    # 初始化过程,发送初始化请求,收到响应后再发送初始化完成通知
    def initialize(self):
        # 向服务器发送初始化请求,获得服务返回的初始响应
        response = self._request(
            InitializeRequest(
                params=InitializeRequestParams(
                    protocolVersion=LATEST_PROTOCOL_VERSION,
                    capabilities=ClientCapabilities(),
                    clientInfo=Implementation(name="mcp-client", version="0.1.0"),
                )
            )
        )
         # 发送"初始化完成"通知
+       self._notify(InitializedNotification())
        return InitializeResult.model_validate(response, by_name=False)

1.3. stdio.py #

11.mcp/mcp_lite/client/stdio.py

from pydantic import BaseModel, Field
from pathlib import Path
import sys
import subprocess
import os
import threading
from contextlib import contextmanager
from pathlib import Path
from queue import Queue

+from mcp_lite.message import SessionMessage
+from mcp_lite.types import jsonrpc_message_adapter


class StdioServerParameters(BaseModel):
    # 要执行的命令
    command: str  # python
    # 命令行参数
    args: list = Field(
        default_factory=list
    )  # ['D:\\forever\\rag_code\\11.mcp\\1.mcp.py', 'serve']
    # 额外的环境变量
    env: dict | None = None
    # 工作目录
    cwd: str | None | Path = None
    # 字符编码
    encoding: str = "utf-8"


# 定义环境变量名列表,根据平台不同选择不同的变量集合
_ENV_VARS = (
    # 如果是在Windows平台,使用以下环境变量名
    [
        "APPDATA",
        "HOMEDRIVE",
        "HOMEPATH",
        "LOCALAPPDATA",
        "PATH",
        "PATHEXT",
        "PROCESSOR_ARCHITECTURE",
        "SYSTEMDRIVE",
        "SYSTEMROOT",
        "TEMP",
        "USERNAME",
        "USERPROFILE",
    ]
    # 否则(非Windows),使用以下环境变量名
    if sys.platform == "win32"
    else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)


# 获取环境变量,并排除值以"()"开头的变量
def _env():
    # 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
    return {
        k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")
    }


# 定义一个stdio客户端上下文管理器,用于实现通过stdio与子进程通信
@contextmanager
def stdio_client(server, errlog=sys.stderr):
    # 创建线程安全的读取队列
    read_q = Queue()
    # 创建线程安全的写入队列
    write_q = Queue()
    # 如果在windows上执行并且命令为python,则使用当前的python解释器,否则直接使用指定命令
    # Python的sys.platform并不区分32位还是64位,不管你是32还是64,sys.platform 都是 "win32"
    cmd = (
        sys.executable
        if sys.platform == "win32" and server.command == "python"
        else server.command
    )
    env = {**_env(), "PYTHONIOENCODING": "utf-8", **(server.env or {})}
    # 启动一个子进程,并重定向到stdio stdout stderr
    proc = subprocess.Popen(
        [cmd] + server.args,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=errlog,
        env=env,
        cwd=server.cwd,
        text=False,  # 表示使用二进制的模式
    )

    # 定义从子进程stdout中读取数据的线程函数
    def read_stdout():
        try:
            while True:
                # 持续逐行读取子进程中的标准输出
                line = proc.stdout.readline()
                # 如果读到空行,说明输出结束了,退出循环
                if not line:
                    break
                # 用于指定编码格式解码,并去除首尾的字符
                line = line.decode(server.encoding).strip()
                if line:
                    try:
+                       message = jsonrpc_message_adapter.validate_json(line, by_name=False)
+                       msg = SessionMessage(message=message)
                        read_q.put(msg)
                    except Exception:
                        pass

        finally:
            read_q.put(None)

    # 定义向子进程stdin写数据的线程 函数
    def write_stdin():
        try:
            # 不断的尝试取出待写入的消息(阻塞)
            while (m := write_q.get()) is not None:
                # 将消息对象序列化成JSON字符串,追加换行并编码,写入子进程的stdin
                msg = (
                    m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n"
                ).encode(server.encoding)
                # 写入子进程的stdin
                proc.stdin.write(msg)
                # 强制刷新过去,不要缓存,立刻生效
                proc.stdin.flush()
        except (BrokenPipeError, ConnectionResetError):
            pass

    # 创建并并启动读取线程,设置为守护线程
    rt = threading.Thread(target=read_stdout, daemon=True)
    # 创建并启动写入线程,设置为守护线程
    wt = threading.Thread(target=write_stdin, daemon=True)
    rt.start()
    wt.start()

    # 定义读取流的类
    class ReadStream:
        def get(self):
            # 从读取队列中获取数据(阻塞) 返回一条消息
            return read_q.get()

    # 定义写入流的类
    class WriteStream:
        # 将消息放入队列中,等待写进程处理
        def send(self, msg):
            write_q.put(msg)

    try:
        yield ReadStream(), WriteStream()
    finally:
        # 1.放入一个None通知写入线程停止
        write_q.put(None)
        # 2.等待写线程结束(给它点时间处理完队列里剩余的消息并退出)
        wt.join(timeout=3)
        # 3.关闭子进程的stdin管道 1. 关闭对服务器子进程的输入流
        proc.stdin.close()
        # 4.等待子进程退出 2. 等待退出,或超时后发 SIGTERM
        # proc.wait(timeout=2)
        # 5.如果进程没有正常退出,则强制杀死进程并等待其退出
        if proc.poll() is None:
            proc.kill()
            proc.wait()

1.4. fastmcp.py #

11.mcp/mcp_lite/server/fastmcp.py

from mcp_lite.server import stdio
import sys
from mcp_lite.message import SessionMessage
from mcp_lite.types import (
    JSONRPCRequest,
    JSONRPCError,
    ErrorData,
    LATEST_PROTOCOL_VERSION,
    ServerCapabilities,
    Implementation,
    InitializeResult,
    JSONRPCResponse,
    JSONRPCError,
)


class FastMCP:
    def __init__(self, name):
        self.name = name

    # 才是MCP服务真正处理业务逻辑地方
    def _handle(self, req):
        method, params, rid = req.method, req.params or {}, req.id
        if method == "initialize":
            r = InitializeResult(
                protocolVersion=LATEST_PROTOCOL_VERSION,
                capabilities=ServerCapabilities(),
                clientInfo=Implementation(name="mcp-server", version="0.1.0"),
            )
            return JSONRPCResponse(
+               jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_unset=True)
            )
        # notifications/initialized 是通知,不需要响应
+       if method == "notifications/initialized":
+           return None
        return JSONRPCError(
            jsonrpc="2.0",
            id=rid,
            error=ErrorData(code=-32601, message=f"方法未找到:{method}"),
        )

    # 处理客户端发送来的请求消息,返回响应消息
    def _handle_msg(self, msg):
        # 判断消息类型是不是SessionMessage,不是则忽略
        if not isinstance(msg, SessionMessage):
            return None
        m = msg.message
        # 判断消息类型是不是JSONRPCRequest,不是则忽略
        if not isinstance(m, JSONRPCRequest):
            return None
        print(
            "[服务器]接到请求:",
            m.model_dump_json(by_alias=True, exclude_unset=True),
            file=sys.stderr,
        )
        try:
            # 这里进行真正的业务处理方法,传入 JSONRPCRequest 而非 SessionMessage
+           resp = self._handle(m)
+           if resp is not None:
+               print(
+                   "[服务器]响应",
+                   resp.model_dump_json(by_alias=True, exclude_unset=True),
+                   file=sys.stderr,
+               )
            return resp
        except Exception as e:
            err = JSONRPCError(
                jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e))
            )
            print(
                "[服务器]响应错误",
                err.model_dump_json(by_alias=True, exclude_unset=True),
                file=sys.stderr,
            )
            return err

    # 运行MCP服务器的方法,默认采用stdio传输方式
+   def run(self, transport="stdio"):
        # 启动stdio server,当前对象的_handle_msg作为消息处理回调函数
        stdio.stdio_server(self._handle_msg)

1.5. types.py #

11.mcp/mcp_lite/types.py

from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
from typing import Any, Literal

# to_camel是pydantic提供一个工具函数,用于将字段名转换为驼峰命名法 user_name => userName, first_name =firstName
from pydantic.alias_generators import to_camel

# 请求的ID 可以是整数,也可能是字符串
RequestID = int | str
# 当前协议最新的版本号
LATEST_PROTOCOL_VERSION = "2025-11-05"


# 作用是将python的蛇形命名转为驼峰命名
class MCPModel(BaseModel):
    model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)


class ClientCapabilities(MCPModel):
    # 可选字段,客户端实验性能力的扩展
    experimental: dict[str, dict[str, Any]] | None = None


class ServerCapabilities(MCPModel):
    # 可选字段,服务器实验性能力的扩展
    experimental: dict[str, dict[str, Any]] | None = None


# 定义服务器或客户端的实现信息的结构体
class Implementation(MCPModel):
    # 名称
    name: str = ""
    # 标题
    title: str | None = None
    # 版本号
    version: str = ""
    # 可选的描述
    description: str | None = None


class JSONRPCRequest(BaseModel):
    # jsonrpc协议版本 固定为2.0
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求ID
    id: RequestID = None
    # 请求的方法
    method: str = ""
    # 方法的参数,类型为一个字典或者为None
    params: dict[str, Any] | None = None


class JSONRPCNotification(BaseModel):
    # jsonrpc协议版本 固定为2.0
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求的方法
    method: str = ""
    # 方法的参数,类型为一个字典或者为None
    params: dict[str, Any] | None = None


class JSONRPCResponse(BaseModel):
    # jsonrpc协议版本 固定为2.cd .
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求ID
    id: RequestID = None
    # 响应的结果,可以为字典,也可能为None
    result: dict[str, Any] | None = None


# 错误对象的数据结构
class ErrorData(BaseModel):
    # 错误码 默认值为0
    code: int = 0
    # 错误消息 默认值为空字符串
    message: str = ""
    # 附加的错误数据,可以为任意类型或者为None
    data: Any = None


# 错误消息的数据结构
class JSONRPCError(BaseModel):
    # jsonrpc协议版本 固定为2.0
    jsonrpc: Literal["2.0"] = "2.0"
    # 请求ID
    id: RequestID | None = None
    # 响应的结果,可以为字典,也可能为None
    error: ErrorData | None = None


# 定义所有的JSONRPC消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCResponse | JSONRPCNotification | JSONRPCError
# 定义JSONRPC消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)


# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
    # 协议的版本号
    protocolVersion: str = ""
    # 客户端能提供的能力
    capabilities: ClientCapabilities = None
    # 可选参数 代表客户端的信息
    clientInfo: Implementation = None


# 定义 初始化请求结构体
class InitializeRequest(MCPModel):
    # 方法名 固定
    method: Literal["initialize"] = "initialize"
    # 可选参数
    params: InitializeRequestParams = None


class InitializeResult(MCPModel):
    # 协议的版本号
    protocolVersion: str = ""
    # 服务器能提供的能力
    capabilities: ServerCapabilities = None
    # 可选参数 代表服务器的信息
    serverInfo: Implementation = None
    # 可选的服务器的说明信息

    instructions: str | None = None

 # 定义客户端初始化完成通知结构体
+class InitializedNotification(MCPModel):
    # 方法名,固定为 "notifications/initialized"
+  method: Literal["notifications/initialized"] = "notifications/initialized"
    # 可选字段:通知参数,可以为字典或 None
+  params: dict[str, Any] | None = None
← 上一节 uvicorn
下一节 没有下一节 →

访问验证

请输入访问令牌

Token不正确,请重新输入