1.1. 1.mcp.py #
11.mcp/1.mcp.py
import sys
from mcp_lite import StdioServerParameters, ClientSession
from mcp_lite.client.stdio import stdio_client
from mcp_lite.server.fastmcp import FastMCP
mcp = FastMCP(name="HelloMCP")
def run_client():
# python 1.mcp.py serve
server_params = StdioServerParameters(command="python", args=[__file__, "serve"])
with stdio_client(server_params) as (read, write):
# 创建客户端会话对象,绑定读写流
session = ClientSession(read, write)
# 初始化对话,完成握手
result = session.initialize()
print("result", result)
def run_server():
print("启动MCP服务器(stdio模式)", file=sys.stderr)
mcp.run(transport="stdio")
def main():
+ if len(sys.argv) >= 2 and sys.argv[1] == "serve":
run_server()
else:
run_client()
if __name__ == "__main__":
main()
# TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol
1.2. session.py #
11.mcp/mcp_lite/client/session.py
import sys
from mcp_lite.message import SessionMessage
from mcp_lite.types import (
LATEST_PROTOCOL_VERSION,
ClientCapabilities,
InitializeRequestParams,
InitializeRequest,
Implementation,
JSONRPCRequest,
JSONRPCResponse,
JSONRPCError,
InitializeResult,
+ InitializedNotification, # 导入初始化完成通知
+ JSONRPCNotification
)
# 定义一个MCPError异常类,用于表示协议的相关错误
class MCPError(Exception):
def __init__(self, code, message, data=None):
self.code, self.message, self.data = code, message, data
super().__init__(message)
@classmethod
def from_jsonrpc(cls, err):
return cls(err.error.code, err.error.message, err.error.data)
class ClientSession:
def __init__(self, read_stream, write_stream):
# 读写流
self._read, self._write = read_stream, write_stream
# 初始化请求ID,计数器默认为0
self._req_id = 0
def _request(self, req):
# 当前的请求ID
rid = self._req_id
# 自增请求ID,保证下次用新ID,下次也是唯一
self._req_id += 1
# 对请求对象进行序列化,转为dict字典
d = req.model_dump(by_alias=True, mode="json", exclude_none=True)
jreq = JSONRPCRequest(
jsonrpc="2.0", id=rid, method=d["method"], params=d["params"]
)
print(
"[客户端] 请求:",
jreq.model_dump_json(by_alias=True, exclude_unset=True),
file=sys.stderr,
)
# 将请求对象封装成SessionMessage发送给服务器
self._write.send(SessionMessage(message=jreq))
# 发送消息之后还要在这里等待服务器的响应
while True:
# 从读取流中读取一条消息
msg = self._read.get()
if msg is None:
raise MCPError(-32000, "连接关闭")
if not isinstance(msg, SessionMessage):
continue
m = msg.message
# 如果响应是合法的并且ID匹配,则就是找到正确的响应
if (
isinstance(m, (JSONRPCResponse, JSONRPCError))
and getattr(m, "id", None) == rid
):
print(
"[客户端]收到响应:",
m.model_dump_json(by_alias=True, exclude_unset=True),
file=sys.stderr,
)
if isinstance(m, JSONRPCError):
raise MCPError.from_jsonrpc(m)
return m.result
# 定义 _notify 方法,用于发送通知(不期待响应)
+ def _notify(self, n):
# 将通知对象序列化为字典,采用别名、JSON模式并排除None字段
+ d = n.model_dump(by_alias=True, mode="json", exclude_none=True)
# 构造 JSONRPCNotification 消息并通过写流发送出去
+ self._write.send(SessionMessage(message=JSONRPCNotification(
+ jsonrpc="2.0",
+ method=d["method"],
+ params=d.get("params")
+ )))
# 初始化过程,发送初始化请求,收到响应后再发送初始化完成通知
def initialize(self):
# 向服务器发送初始化请求,获得服务返回的初始响应
response = self._request(
InitializeRequest(
params=InitializeRequestParams(
protocolVersion=LATEST_PROTOCOL_VERSION,
capabilities=ClientCapabilities(),
clientInfo=Implementation(name="mcp-client", version="0.1.0"),
)
)
)
# 发送"初始化完成"通知
+ self._notify(InitializedNotification())
return InitializeResult.model_validate(response, by_name=False)
1.3. stdio.py #
11.mcp/mcp_lite/client/stdio.py
from pydantic import BaseModel, Field
from pathlib import Path
import sys
import subprocess
import os
import threading
from contextlib import contextmanager
from pathlib import Path
from queue import Queue
+from mcp_lite.message import SessionMessage
+from mcp_lite.types import jsonrpc_message_adapter
class StdioServerParameters(BaseModel):
# 要执行的命令
command: str # python
# 命令行参数
args: list = Field(
default_factory=list
) # ['D:\\forever\\rag_code\\11.mcp\\1.mcp.py', 'serve']
# 额外的环境变量
env: dict | None = None
# 工作目录
cwd: str | None | Path = None
# 字符编码
encoding: str = "utf-8"
# 定义环境变量名列表,根据平台不同选择不同的变量集合
_ENV_VARS = (
# 如果是在Windows平台,使用以下环境变量名
[
"APPDATA",
"HOMEDRIVE",
"HOMEPATH",
"LOCALAPPDATA",
"PATH",
"PATHEXT",
"PROCESSOR_ARCHITECTURE",
"SYSTEMDRIVE",
"SYSTEMROOT",
"TEMP",
"USERNAME",
"USERPROFILE",
]
# 否则(非Windows),使用以下环境变量名
if sys.platform == "win32"
else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
)
# 获取环境变量,并排除值以"()"开头的变量
def _env():
# 遍历_ENV_VARS,每个变量k若存在且其值v不是以"()"开头,则加入字典
return {
k: v for k in _ENV_VARS if (v := os.environ.get(k)) and not v.startswith("()")
}
# 定义一个stdio客户端上下文管理器,用于实现通过stdio与子进程通信
@contextmanager
def stdio_client(server, errlog=sys.stderr):
# 创建线程安全的读取队列
read_q = Queue()
# 创建线程安全的写入队列
write_q = Queue()
# 如果在windows上执行并且命令为python,则使用当前的python解释器,否则直接使用指定命令
# Python的sys.platform并不区分32位还是64位,不管你是32还是64,sys.platform 都是 "win32"
cmd = (
sys.executable
if sys.platform == "win32" and server.command == "python"
else server.command
)
env = {**_env(), "PYTHONIOENCODING": "utf-8", **(server.env or {})}
# 启动一个子进程,并重定向到stdio stdout stderr
proc = subprocess.Popen(
[cmd] + server.args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=errlog,
env=env,
cwd=server.cwd,
text=False, # 表示使用二进制的模式
)
# 定义从子进程stdout中读取数据的线程函数
def read_stdout():
try:
while True:
# 持续逐行读取子进程中的标准输出
line = proc.stdout.readline()
# 如果读到空行,说明输出结束了,退出循环
if not line:
break
# 用于指定编码格式解码,并去除首尾的字符
line = line.decode(server.encoding).strip()
if line:
try:
+ message = jsonrpc_message_adapter.validate_json(line, by_name=False)
+ msg = SessionMessage(message=message)
read_q.put(msg)
except Exception:
pass
finally:
read_q.put(None)
# 定义向子进程stdin写数据的线程 函数
def write_stdin():
try:
# 不断的尝试取出待写入的消息(阻塞)
while (m := write_q.get()) is not None:
# 将消息对象序列化成JSON字符串,追加换行并编码,写入子进程的stdin
msg = (
m.message.model_dump_json(by_alias=True, exclude_unset=True) + "\n"
).encode(server.encoding)
# 写入子进程的stdin
proc.stdin.write(msg)
# 强制刷新过去,不要缓存,立刻生效
proc.stdin.flush()
except (BrokenPipeError, ConnectionResetError):
pass
# 创建并并启动读取线程,设置为守护线程
rt = threading.Thread(target=read_stdout, daemon=True)
# 创建并启动写入线程,设置为守护线程
wt = threading.Thread(target=write_stdin, daemon=True)
rt.start()
wt.start()
# 定义读取流的类
class ReadStream:
def get(self):
# 从读取队列中获取数据(阻塞) 返回一条消息
return read_q.get()
# 定义写入流的类
class WriteStream:
# 将消息放入队列中,等待写进程处理
def send(self, msg):
write_q.put(msg)
try:
yield ReadStream(), WriteStream()
finally:
# 1.放入一个None通知写入线程停止
write_q.put(None)
# 2.等待写线程结束(给它点时间处理完队列里剩余的消息并退出)
wt.join(timeout=3)
# 3.关闭子进程的stdin管道 1. 关闭对服务器子进程的输入流
proc.stdin.close()
# 4.等待子进程退出 2. 等待退出,或超时后发 SIGTERM
# proc.wait(timeout=2)
# 5.如果进程没有正常退出,则强制杀死进程并等待其退出
if proc.poll() is None:
proc.kill()
proc.wait()
1.4. fastmcp.py #
11.mcp/mcp_lite/server/fastmcp.py
from mcp_lite.server import stdio
import sys
from mcp_lite.message import SessionMessage
from mcp_lite.types import (
JSONRPCRequest,
JSONRPCError,
ErrorData,
LATEST_PROTOCOL_VERSION,
ServerCapabilities,
Implementation,
InitializeResult,
JSONRPCResponse,
JSONRPCError,
)
class FastMCP:
def __init__(self, name):
self.name = name
# 才是MCP服务真正处理业务逻辑地方
def _handle(self, req):
method, params, rid = req.method, req.params or {}, req.id
if method == "initialize":
r = InitializeResult(
protocolVersion=LATEST_PROTOCOL_VERSION,
capabilities=ServerCapabilities(),
clientInfo=Implementation(name="mcp-server", version="0.1.0"),
)
return JSONRPCResponse(
+ jsonrpc="2.0", id=rid, result=r.model_dump(by_alias=True, exclude_unset=True)
)
# notifications/initialized 是通知,不需要响应
+ if method == "notifications/initialized":
+ return None
return JSONRPCError(
jsonrpc="2.0",
id=rid,
error=ErrorData(code=-32601, message=f"方法未找到:{method}"),
)
# 处理客户端发送来的请求消息,返回响应消息
def _handle_msg(self, msg):
# 判断消息类型是不是SessionMessage,不是则忽略
if not isinstance(msg, SessionMessage):
return None
m = msg.message
# 判断消息类型是不是JSONRPCRequest,不是则忽略
if not isinstance(m, JSONRPCRequest):
return None
print(
"[服务器]接到请求:",
m.model_dump_json(by_alias=True, exclude_unset=True),
file=sys.stderr,
)
try:
# 这里进行真正的业务处理方法,传入 JSONRPCRequest 而非 SessionMessage
+ resp = self._handle(m)
+ if resp is not None:
+ print(
+ "[服务器]响应",
+ resp.model_dump_json(by_alias=True, exclude_unset=True),
+ file=sys.stderr,
+ )
return resp
except Exception as e:
err = JSONRPCError(
jsonrpc="2.0", id=m.id, error=ErrorData(code=-32603, message=str(e))
)
print(
"[服务器]响应错误",
err.model_dump_json(by_alias=True, exclude_unset=True),
file=sys.stderr,
)
return err
# 运行MCP服务器的方法,默认采用stdio传输方式
+ def run(self, transport="stdio"):
# 启动stdio server,当前对象的_handle_msg作为消息处理回调函数
stdio.stdio_server(self._handle_msg)
1.5. types.py #
11.mcp/mcp_lite/types.py
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
from typing import Any, Literal
# to_camel是pydantic提供一个工具函数,用于将字段名转换为驼峰命名法 user_name => userName, first_name =firstName
from pydantic.alias_generators import to_camel
# 请求的ID 可以是整数,也可能是字符串
RequestID = int | str
# 当前协议最新的版本号
LATEST_PROTOCOL_VERSION = "2025-11-05"
# 作用是将python的蛇形命名转为驼峰命名
class MCPModel(BaseModel):
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
class ClientCapabilities(MCPModel):
# 可选字段,客户端实验性能力的扩展
experimental: dict[str, dict[str, Any]] | None = None
class ServerCapabilities(MCPModel):
# 可选字段,服务器实验性能力的扩展
experimental: dict[str, dict[str, Any]] | None = None
# 定义服务器或客户端的实现信息的结构体
class Implementation(MCPModel):
# 名称
name: str = ""
# 标题
title: str | None = None
# 版本号
version: str = ""
# 可选的描述
description: str | None = None
class JSONRPCRequest(BaseModel):
# jsonrpc协议版本 固定为2.0
jsonrpc: Literal["2.0"] = "2.0"
# 请求ID
id: RequestID = None
# 请求的方法
method: str = ""
# 方法的参数,类型为一个字典或者为None
params: dict[str, Any] | None = None
class JSONRPCNotification(BaseModel):
# jsonrpc协议版本 固定为2.0
jsonrpc: Literal["2.0"] = "2.0"
# 请求的方法
method: str = ""
# 方法的参数,类型为一个字典或者为None
params: dict[str, Any] | None = None
class JSONRPCResponse(BaseModel):
# jsonrpc协议版本 固定为2.cd .
jsonrpc: Literal["2.0"] = "2.0"
# 请求ID
id: RequestID = None
# 响应的结果,可以为字典,也可能为None
result: dict[str, Any] | None = None
# 错误对象的数据结构
class ErrorData(BaseModel):
# 错误码 默认值为0
code: int = 0
# 错误消息 默认值为空字符串
message: str = ""
# 附加的错误数据,可以为任意类型或者为None
data: Any = None
# 错误消息的数据结构
class JSONRPCError(BaseModel):
# jsonrpc协议版本 固定为2.0
jsonrpc: Literal["2.0"] = "2.0"
# 请求ID
id: RequestID | None = None
# 响应的结果,可以为字典,也可能为None
error: ErrorData | None = None
# 定义所有的JSONRPC消息的联合类型
JSONRPCMessage = JSONRPCRequest | JSONRPCResponse | JSONRPCNotification | JSONRPCError
# 定义JSONRPC消息适配器,用于类型自动推断和校验
jsonrpc_message_adapter = TypeAdapter(JSONRPCMessage)
# 定义初始化请求参数结构体
class InitializeRequestParams(MCPModel):
# 协议的版本号
protocolVersion: str = ""
# 客户端能提供的能力
capabilities: ClientCapabilities = None
# 可选参数 代表客户端的信息
clientInfo: Implementation = None
# 定义 初始化请求结构体
class InitializeRequest(MCPModel):
# 方法名 固定
method: Literal["initialize"] = "initialize"
# 可选参数
params: InitializeRequestParams = None
class InitializeResult(MCPModel):
# 协议的版本号
protocolVersion: str = ""
# 服务器能提供的能力
capabilities: ServerCapabilities = None
# 可选参数 代表服务器的信息
serverInfo: Implementation = None
# 可选的服务器的说明信息
instructions: str | None = None
# 定义客户端初始化完成通知结构体
+class InitializedNotification(MCPModel):
# 方法名,固定为 "notifications/initialized"
+ method: Literal["notifications/initialized"] = "notifications/initialized"
# 可选字段:通知参数,可以为字典或 None
+ params: dict[str, Any] | None = None