mcp-go 用法详解及源码解析

时间:2025-09-05 10:15:02来源:互联网

下面小编就为大家分享一篇mcp-go 用法详解及源码解析,具有很好的参考价值,希望对大家有所帮助。

这篇文章打算详细地介绍一下mcp,由于之前已经写过大模型开发以及langchain相关的文章,本文就不打算再重复说明了,如果你对大模型开发的概念、整体流程还存在模糊的点,可以参考。

mcp 是什么,为什么需要 mcp ?

mcp的全称为 model context protocol,模型上下文协议,也就是给大模型对话提供上下文,为什么要给对话提供上下文呢?因为大模型本身是封闭的、静态的,基于海量的离线文本数据进行训练,训练完成之后自身所掌握的知识也就固定下来了,在后续实际回答用户问题时,基于用户提出的问题,在自己训练好的数据中拟合出它认为概率最高、最为合理的文本,在这个过程中大模型没有了解外部世界的能力,故需要给大模型提供上下文,这个上下文其实就指和模型外部世界进行的交互。

比如你问大模型现在是几点,其实大模型自身完全没能力知道现在是几点,因为它只能根据训练数据猜接下来说什么合理,但这些数据又没办法告诉它现在是几点,所以只能给它提供一个工具用于查询当前时间,大模型通过工具获取到实际时间后再回复用户,这就是获取到了对话需要的当前时间这个上下文。这里的工具和很多大模型开发框架中的 Tool 是同一个概念,mcp 协议的一项核心能力也是向大模型提供工具,一般会将mcp服务器返回的工具信息包装为开发框架的 Tool 对象从而让框架可以使用。除工具外,mcp 协议还规定了如何给大模型提供资源信息以及提示词模板。

既然大模型开发框架已经可以通过 Tool 对象让大模型完成和外部世界的交互,为什么还需要 mcp 呢?因为mcp制定了一种标准的协议,只要遵守这种协议,使用完全不同技术栈的团队都可以给大模型提供能力,也可以直接无缝集成其他团队已经发布出去的能力。协议是非常重要的,只有先有了一套大家都认可的协议,才能基于这套协议开发给大模型使用的各类工具,整个生态才能越来越繁荣,如果不制定协议就只能各搞各的,彼此不兼容,不利于大模型生态的发展。

故整体而言,mcp协议规范了应用程序向大模型提供上下文的方式,以一种标准化的方式让大模型连接到不同的数据源和工具。当越来越多厂商使用mcp协议提供自家软件的能力时, 大模型将不再仅仅是一个问答机器人,而是充当一个大脑完成思考,然后基于mcp调用各种外部工具完成用户想完成的事情,从而真正进化为一个智能助理。

mcp 的整体工作原理是什么?

mcp协议中主要包含三个角色:

  1. mcp host: 通常是大模型应用或 IDE 插件,它通过 client 与 server 建立连接
  2. mcp client: mcp协议的client端,通过这个client可以调用 mcp server 的接口
  3. mcp server: mcp协议的server端,实际实现各类能力,并通过接口返回给 mcp client 结果

具体针对于开发而言,mcp host 就是指我们开发的大模型应用,一般基于一些框架来开发,比如 langchaingo, eino,这些框架和 mcp client 集成到一起,比如大模型通过框架调用某个 Tool 对象时,Tool 对象内部会使用 mcp client 发起对 mcp server 的调用,从而获得工具的执行结果。

接下来可以看一段示例代码加深对于 mcp 的认识,这里我们基于 github.com/mark3labs/mcp-go 实现,虽然最近mcp官方也推出了go-sdk,不过目前(2025 年 8 月)为止还很不稳定,不能在生产环境使用,估计接下来的很长一段时间 mcp-go 仍然是实际开发的首选,代码如下:

mcp server 部分:

func main() {
    // mcp 服务器的名称为 demo,版本为 1.0
    mcpSrv := server.NewMCPServer("demo", "1.0")

    // 添加一个名为 toUpper 的工具,作用是将输入文本转为大写
    // 此工具只有一个参数:text,含义为需要转换的文本
    tool := mcp.NewTool(
       "toUpper",
       mcp.WithDescription("将输入文本转换为大写"),
       mcp.WithString("text", mcp.Description("需要转换的文本"), mcp.Required()),
    )

    // 指定工具的执行逻辑,核心使用 strings.ToUpper 转为大写
    mcpSrv.AddTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
       text, err := req.RequireString("text")
       if err != nil {
          return nil, err
       }
       textUpper := fmt.Sprintf("文本转为大写后为: %s", strings.ToUpper(text))
       return mcp.NewToolResultText(textUpper), nil
    })

    // 使用 streamable http 协议启动 mcp 服务器
    httpSrv := server.NewStreamableHTTPServer(mcpSrv)
    if err := httpSrv.Start(":8080"); err != nil {
       panic(err)
    }
}

mcp client 部分:

func main() {
    // 创建 streamable http 协议的 mcp 客户端
    mcpClient, err := client.NewStreamableHttpClient("http://lo*calh*os*t:8080/mcp")
    if err != nil {
       panic(err)
    }
    ctx := context.Background()
    // 这里的 Start,主要是为了接收 mcp server 的通知
    if err = mcpClient.Start(ctx); err != nil {
       panic(err)
    }

    // mcp 协议规定,client 必须先发送 initialize 请求以交换信息并协商支持的功能
    initReq := mcp.InitializeRequest{
       Params: mcp.InitializeParams{
          ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
          ClientInfo: mcp.Implementation{
             Name:    "http-client",
             Version: "1.0.0",
          },
       },
    }
    initRes, err := mcpClient.Initialize(ctx, initReq)
    if err != nil {
       panic(err)
    }
    slog.Info("Initialized client", "server info", initRes.ServerInfo)

    // 查看 mcp server 支持哪些工具
    toolsReq := mcp.ListToolsRequest{}
    tools, err := mcpClient.ListTools(ctx, toolsReq)
    if err != nil {
       panic(err)
    }
    for _, tool := range tools.Tools {
       slog.Info("Listing tool", "name", tool.Name, "inputSchema", tool.InputSchema)
    }

    // 调用 toUpper 工具,将 abc 转为大写
    callReq := mcp.CallToolRequest{
       Params: mcp.CallToolParams{
          Name: "toUpper",
          Arguments: map[string]any{
             "text": "abc",
          },
       },
    }
    callRes, err := mcpClient.CallTool(ctx, callReq)
    if err != nil {
       panic(err)
    }
    slog.Info("CallTool response", "response", callRes.Content[0].(mcp.TextContent).Text)
}

// output:
// INFO Initialized client "server info"="{Name:demo Version:1.0}"
// INFO Listing tool name=toUpper inputSchema="{Defs:map[] Type:object Properties:map[text:map[description:需要转换的文本 type:string]] Required:[text]}"
// INFO CallTool response response="文本转为大写后为: ABC"

从输出的信息来看,首先正常地完成了初始化操作,并且获取到了 mcp server 的基本信息,名称为 demo,版本为 1.0,这和我们在 mcp server 中设置的是一样的。之后使用 ListTools 方法查看 mcp server 提供了哪些工具,只有一个工具,就是我们添加的 toUpper 工具,最后使用 CallTool 方法远程调用 toUpper 工具,工具的参数一定要按照工具规定的结构传参,即 InputSchema 规定的结构,比如这里就只有一个 text 参数,类型为字符串,将其赋值为 abc,调用的结果为 文本转为大写后为: ABC,也符合预期。

其实这里就是一个远程调用,也就是rpc,目前这些代码和大模型基本没啥关系,mcp 也确实就是基于 JSON-RPC 2.0 设计的,在此之上增加了初始化协商、工具发现、资源、提示词管理等机制,从而能与大模型应用无缝集成。故这里展示的就是 mcp 提供的一种远程调用的能力,那具体该如何让 mcp 和大模型结合到一起呢,可参考如下代码,此处基于 langchaingo 实现:

func main() {
    llm, err := openai.New(
       openai.WithBaseURL("url"),
       openai.WithToken("key"),
       openai.WithModel("qwen-plus"),
    )
    if err != nil {
       log.Fatal(err)
    }

    // 获取mcp服务器包含的工具信息,并包装为 langchaingo 中的 Tool 对象
    tool, err := getMCPTools()
    if err != nil {
       log.Fatal(err)
    }

    // 创建一个 ai agent,并注册了全部的mcp工具
    agent := agents.NewConversationalAgent(llm, tool)
    chain := agents.NewExecutor(
       agent,
       agents.WithMaxIterations(5),
       agents.WithMemory(memory.NewConversationBuffer()),
    )
    ctx := context.Background()
    // 要求大模型将字符串转为大写,此处大模型应该会使用工具
    output, err := chains.Run(ctx, chain, "帮我把这个字符串转为大写:abcDEF")
    if err != nil {
       log.Fatal(err)
    }
    fmt.Println(output)
}

// mcpTool mcp 服务器包含的工具都将被包装为此对象,实现 langchaingo 中的 tools.Tool 接口
type mcpTool struct {
    name        string
    description string
    inputSchema mcp.ToolInputSchema
    client      *client.Client
}

// Name mcp 工具名称
func (t *mcpTool) Name() string {
    return t.name
}

// Description mcp 工具的描述信息,这里要写清楚工具的作用和工具如何传参
func (t *mcpTool) Description() string {
    desc := fmt.Sprintf("此工具作用为 %s n", t.description)
    desc += "工具的输入参数为json对象,具体包含以下字段:n"
    for prop, propDesc := range t.inputSchema.Properties {
       desc += fmt.Sprintf("字段名: %v, 字段描述: %v n", prop, propDesc)
    }
    return desc
}

// Call 实现工具被调用时执行的方法,需要用 mcp client 向 mcp server 发起远程调用
func (t *mcpTool) Call(ctx context.Context, input string) (string, error) {
    // 这里加了一个日志,更加直观地看到 mcp 工具被调用
    log.Printf("mcp tool call: %v, %v", t.Name(), input)
    
    // 指定远程调用的工具名称以及输入参数,参数应当以json格式传递
    req := mcp.CallToolRequest{
       Params: mcp.CallToolParams{
          Name: t.name,
       },
    }
    args := make(map[string]any)
    err := json.Unmarshal([]byte(input), &args)
    if err != nil {
       return "", fmt.Errorf("failed to unmarshal mcp tool input: %w", err)
    }
    req.Params.Arguments = args

    // 执行远程调用方法
    res, err := t.client.CallTool(ctx, req)
    if err != nil {
       return "", fmt.Errorf("failed to call tool: %w", err)
    }

    return res.Content[0].(mcp.TextContent).Text, nil
}


// getMCPTools 获取mcp服务器包含的全部工具
func getMCPTools() ([]tools.Tool, error) {
    mcpClient, err := client.NewStreamableHttpClient("http://lo*calh*os*t:8080/mcp")
    if err != nil {
       return nil, err
    }
    ctx := context.Background()
    if err = mcpClient.Start(ctx); err != nil {
       return nil, err
    }

    // mcp 初始化
    initReq := mcp.InitializeRequest{
       Params: mcp.InitializeParams{
          ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
          ClientInfo: mcp.Implementation{
             Name:    "http-client",
             Version: "1.0.0",
          },
       },
    }
    initRes, err := mcpClient.Initialize(ctx, initReq)
    if err != nil {
       return nil, err
    }
    log.Println(initRes.ServerInfo)

    // 获取 mcp 服务器包含的工具列表
    toolsReq := mcp.ListToolsRequest{}
    toolsRes, err := mcpClient.ListTools(ctx, toolsReq)
    if err != nil {
       return nil, err
    }

    // 将获取到的 mcp 工具包装为 mcpTool 对象,从而与 langchaingo 框架集成
    ts := make([]tools.Tool, 0, len(toolsRes.Tools))
    for _, tool := range toolsRes.Tools {
       ts = append(ts, &mcpTool{
          name:        tool.GetName(),
          description: tool.Description,
          inputSchema: tool.InputSchema,
          client:      mcpClient,
       })
    }
    return ts, nil
}

// output:
// {demo 1.0}
// mcp tool call: toUpper, {"text": "abcDEF"}
// 文本转为大写后为: ABCDEFF

这里的核心是 getMCPTools 函数,通过 mcp client 的 ListTools 方法获取到 mcp server 支持的工具列表之后,将其转换为 mcpTool 对象,mcpTool 对象实现了 langchaingo中的 tools.Tool 接口,这里需要注意的是一定要在 Description 方法中写明白工具的作用以及工具调用的详细参数,此方法返回的描述文案最终会加到发给大模型的提示词中,以便于大模型知道哪些工具可以调用,各工具如何进行传参。从日志来看,此工具确实被调用了,工具传参也是正确的(mcp tool call: toUpper, {"text": "abcDEF"}),最终给用户的回复也是正常的。关于 langchaingo 具体是如何让大模型判断需要使用工具并实际调起工具的,可以参考,这里写的很清楚了。

通过以上这两段代码我们对于大模型调用 mcp 工具有了直观的认识,接下来我们看下 mcp 协议的细节以及相应的代码实现。

协议细节及源码解读:

如前面示例代码展示,mcp 本质上是大模型通过 mcp client 远程调用 mcp server 上的工具来完成特定操作。mcp 使用 JSON-RPC 2.0 构建这种远程调用,因为 JSON-RPC 足够轻量且与传输层无关,可通过 HTTP 或 stdio 通信,满足本地部署或远程部署需求。JSON-RPC 通过固定的 JSON 结构表征客户端想调用的方法及其参数:

请求结构:

{
  jsonrpc: "2.0";
  id: string | number;
  method: string;
  params?: {
    [key: string]: unknown;
  };
}

响应结构:

{
  jsonrpc: "2.0";
  id: string | number;
  result?: {
    [key: string]: unknown;
  }
  error?: {
    code: number;
    message: string;
    data?: unknown;
  }
}
  • method 指定要调用的方法
  • params 指定方法参数
  • id 用于对应请求与响应
  • 成功返回 result,失败返回 error,包含 codemessage

在规定好请求和响应的结构之后,接下来需要一个传输层在客户端和服务端之间传递 JSON-RPC 消息,mcp 目前支持两种:stdioStreamable HTTP,之前还有一种基于 sse 的方式,已经废弃了,就不介绍了。

  • Stdio:mcp server 与大模型应用在同一台机器时使用,性能更好,安全性更高,通信通过子进程 stdin/stdout,消息以换行符分隔
  • Streamable HTTP:用于远程部署场景,基于 HTTP 协议,可返回 一次性 JSON流式事件(text/event-stream) ,客户端需要同时处理普通响应和流式响应。

接下来看下两种传输层的源码:

stdio 作为传输层:

如果 mcp server 和大模型应用部署到同一台机器一般会使用 stdio 作为传输层,性能更好,也更为安全,这种方式也很常见,比如我们希望 claude 访问自己电脑的文件系统,可以做如下配置:

{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/Users/username/Desktop",
        "/path/to/other/allowed/dir"
      ]
    }
  }
}

这种就是通过 stdio 进行传输,将 mcp server 程序直接放到本地某个目录,然后在 claude 等大模型应用的mcp配置文件中指定可执行程序的路径(这里的 mcp server 是 ts 写的,故这里用 npx 启动),实际使用时,大模型应用会将 mcp server 作为子进程启动,通信时,向子进程的 stdin 写入请求,从 stdout 读取响应,消息之间由 n 进行分隔,可以看一些 mcp server 源码加深一下理解:

func (s *StdioServer) processInputStream(ctx context.Context, reader *bufio.Reader, stdout io.Writer) error {
    for {
       if err := ctx.Err(); err != nil {
          return err
       }

       // 自 stdin 逐行读取请求,不同请求之间是按换行符区分的
       line, err := s.readNextLine(ctx, reader)
       if err != nil {
          if err == io.EOF {
             return nil
          }
          s.errLogger.Printf("Error reading input: %v", err)
          return err
       }

       // 处理消息
       if err := s.processMessage(ctx, line, stdout); err != nil {
          if err == io.EOF {
             return nil
          }
          s.errLogger.Printf("Error handling message: %v", err)
          return err
       }
    }
}

func (s *StdioServer) processMessage(
    ctx context.Context,
    line string,
    writer io.Writer,
) error {
    // If line is empty, likely due to ctx cancellation
    if len(line) == 0 {
       return nil
    }

    // 先将消息解码到 json.RawMessage 之上
    var rawMessage json.RawMessage
    if err := json.Unmarshal([]byte(line), &rawMessage); err != nil {
       response := createErrorResponse(nil, mcp.PARSE_ERROR, "Parse error")
       return s.writeResponse(response, writer)
    }

    ...

    // 略过一些代码,最终会把消息交给 MCPServer.HandleMessage 方法,
    // HandleMessage 就是与传输层无关的了,统一处理所有收到的rpc调用
    response := s.server.HandleMessage(ctx, rawMessage)

    // Only write response if there is one (not for notifications)
    if response != nil {
       if err := s.writeResponse(response, writer); err != nil {
          return fmt.Errorf("failed to write response: %w", err)
       }
    }

    return nil
}

这里删除了一部分逻辑,只保留了核心处理流程,从 stdio 使用 readNextLine 逐行读取消息,将每行消息解析到 json.RawMessage 之上,之后使用 MCPServer.HandleMessage 方法处理收到的 rpc 请求,HandleMessage 方法与传输层无关,后面统一介绍。

Streamable HTTP 作为传输层:

如果大模型应用和 mcp server 无法部署到一台机器之上,则需要通过 Streamable HTTP 协议进行远程调用,Streamable HTTP 可以认为是基于 HTTP 协议的,按照同样的协议解析请求并返回响应,只是响应的 content-type 可能是 application/json,也可能是 text/event-stream,客户端需要同时处理好收到普通 json 响应与流式响应的情况。具体针对到 mcp 协议而言,首先需要规定一个 endpoint 作为传输的入口点,比如默认就是 /mcp,所以示例代码创建 client 的逻辑为 mcpClient, err := client.NewStreamableHttpClient("http://lo*calh*os*t:8080/mcp"),就是使用 /mcp 作为 endpoint,对于 server 而言,/mcp 需要同时支持 POST 与 GET 方法进行请求,POST 方法用于接收 client 请求返回响应,GET 方法主要是为了打开一个 sse 流,让服务端能主动给客户端推送消息,接下来先看下 streamable http 协议下的 mcp client 是如何发送请求的,以 CallTool 方法为例:

func (c *Client) CallTool(
    ctx context.Context,
    request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
    response, err := c.sendRequest(ctx, "tools/call", request.Params)
    if err != nil {
       return nil, err
    }

    return mcp.ParseCallToolResult(response)
}

func (c *Client) sendRequest(
    ctx context.Context,
    method string,
    params any,
) (*json.RawMessage, error) {
    if !c.initialized && method != "initialize" {
       return nil, fmt.Errorf("client not initialized")
    }

    id := c.requestID.Add(1)

    request := transport.JSONRPCRequest{
       JSONRPC: mcp.JSONRPC_VERSION,
       ID:      mcp.NewRequestId(id),
       Method:  method,
       Params:  params,
    }

    response, err := c.transport.SendRequest(ctx, request)
    if err != nil {
       return nil, transport.NewError(err)
    }

    if response.Error != nil {
       return nil, errors.New(response.Error.Message)
    }

    return &response.Result, nil
}

可以看到最终调用了当前传输层的 SendRequest 方法,以下为 StreamableHTTP 对象的实现:

func (c *StreamableHTTP) SendRequest(
    ctx context.Context,
    request JSONRPCRequest,
) (*JSONRPCResponse, error) {
    // 请求体使用 json 进行序列化
    requestBody, err := json.Marshal(request)
    if err != nil {
       return nil, fmt.Errorf("failed to marshal request: %w", err)
    }

    ctx, cancel := c.contextAwareOfClientClose(ctx)
    defer cancel()

    // 发送 http 请求,sendHTTP 内部实现的并不复杂,核心是使用 http.Client 发送 http 请求
    // 请求体的 content-type 为 application/json
    resp, err := c.sendHTTP(ctx, http.MethodPost, bytes.NewReader(requestBody), "application/json, text/event-stream")
    if err != nil {
       if errors.Is(err, ErrSessionTerminated) && request.Method == string(mcp.MethodInitialize) {
          // If the request is initialize, should not return a SessionTerminated error
          // It should be a genuine endpoint-routing issue.
          // ( Fall through to return StatusCode checking. )
       } else {
          return nil, fmt.Errorf("failed to send request: %w", err)
       }
    }

    ...

    // 处理返回值,需处理两种不同的 content-type
    mediaType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type"))
    switch mediaType {
    case "application/json":
       // Single response
       var response JSONRPCResponse
       if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
          return nil, fmt.Errorf("failed to decode response: %w", err)
       }

       // should not be a notification
       if response.ID.IsNil() {
          return nil, fmt.Errorf("response should contain RPC id: %v", response)
       }

       return &response, nil

    case "text/event-stream":
       // Server is using SSE for streaming responses
       return c.handleSSEResponse(ctx, resp.Body, false)

    default:
       return nil, fmt.Errorf("unexpected content type: %s", resp.Header.Get("Content-Type"))
    }
}

mcp client 进行远程调用的时候,最终会使用 StreamableHTTP.SendRequest 方法,忽略掉中间的一些细节,这里核心就是发送了 http 请求,并处理返回的响应,需要同时支持好 application/json 以及 text/event-stream 类型的响应,handleSSEResponse 里其实还有很多内容,比如接收 mcp server 推送的通知、处理 mcp server 发送的采样请求,不过这里只是为了熟悉传输层,就不过多介绍了。

接下来看下 mcp server 是如何处理 Streamable HTTP 请求的,以下为 server 的 Start 方法:

// Start 启动 http 服务器
func (s *StreamableHTTPServer) Start(addr string) error {
    s.mu.Lock()
    if s.httpServer == nil {
       // 默认使用 http 标准库的 ServeMux 对象作为 http.Handler 处理 http 请求
       mux := http.NewServeMux()

       // 在 endpoint 处注册了一个处理器,endpoint 默认为 /mcp
       mux.Handle(s.endpointPath, s)

       s.httpServer = &http.Server{
          Addr:    addr,
          Handler: mux,
       }
    } else {
       if s.httpServer.Addr == "" {
          s.httpServer.Addr = addr
       } else if s.httpServer.Addr != addr {
          return fmt.Errorf("conflicting listen address: WithStreamableHTTPServer(%q) vs Start(%q)", s.httpServer.Addr, addr)
       }
    }
    srv := s.httpServer
    s.mu.Unlock()

    return srv.ListenAndServe()
}

mux.Handle(s.endpointPath, s) 这里将 s 注册为了 /mcp 的处理器,说明是将 StreamableHTTPServer 自身作为访问 /mcp 的处理器了,故 /mcp 请求会由 StreamableHTTPServer.ServeHTTP 方法处理,代码如下:

// ServeHTTP implements the http.Handler interface.
func (s *StreamableHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case http.MethodPost:
       s.handlePost(w, r)
    case http.MethodGet:
       s.handleGet(w, r)
    case http.MethodDelete:
       s.handleDelete(w, r)
    default:
       http.NotFound(w, r)
    }
}

这里先关注下请求方法为 POST 的逻辑:

func (s *StreamableHTTPServer) handlePost(w http.ResponseWriter, r *http.Request) {
    // 请求的 content-type 必须是 application/json
    contentType := r.Header.Get("Content-Type")
    mediaType, _, err := mime.ParseMediaType(contentType)
    if err != nil || mediaType != "application/json" {
       http.Error(w, "Invalid content type: must be 'application/json'", http.StatusBadRequest)
       return
    }

    ...

    // 此处略去了很多代码,仅从接收 mcp client 请求来看,最终会走到此处,
    // 将请求传给 MCPServer.HandleMessage 方法,
    // 和刚刚使用 stdio 作为传输层一样,使用同一个方法统一处理所有 rpc 请求
    response := s.server.HandleMessage(ctx, rawData)
    if response == nil {
       // For notifications, just send 202 Accepted with no body
       w.WriteHeader(http.StatusAccepted)
       return
    }

    // 基于判断考虑是要直接返回 application/json 类型的响应,还是流式返回 sse 类型的响应
    // 使用 sse 返回一般都是 mcp server 主动向 client 发送通知
    if session.upgradeToSSE.Load() {
       if !upgradedHeader {
          w.Header().Set("Content-Type", "text/event-stream")
          w.Header().Set("Connection", "keep-alive")
          w.Header().Set("Cache-Control", "no-cache")
          w.WriteHeader(http.StatusOK)
          upgradedHeader = true
       }
       if err := writeSSEEvent(w, response); err != nil {
          s.logger.Errorf("Failed to write final SSE response event: %v", err)
       }
    } else {
       w.Header().Set("Content-Type", "application/json")
       if isInitializeRequest && sessionID != "" {
          // send the session ID back to the client
          w.Header().Set(HeaderKeySessionID, sessionID)
       }
       w.WriteHeader(http.StatusOK)
       err := json.NewEncoder(w).Encode(response)
       if err != nil {
          s.logger.Errorf("Failed to write response: %v", err)
       }
    }
}

从上述代码中应该能比较清晰地了解 Streamable HTTP 作为传输层是如何传输 JSON-RPC 的了,其实和纯粹基于 http 协议实现的远程接口调用是类似的。

生命周期及支持的方法:

由上述可知,mcp 使用 JSON-RPC 2.0 来规定请求和响应的消息结构,使用 stdio 或 Streamable HTTP 来传输这些消息,最终使大模型通过 mcp client 完成了远程调用,获得 mcp server 所提供的各种能力。那 mcp server 到底是如何处理这些请求呢?从上面的代码中可以看到无论传输层使用哪种方式,最终都是交由 MCPServer.HandleMessage 方法来处理请求并返回响应的,可以看下此处的源码加深对于 mcp server 的理解:

// HandleMessage 处理 JSON-RPC 请求并返回相应的响应
func (s *MCPServer) HandleMessage(
    ctx context.Context,
    message json.RawMessage,
) mcp.JSONRPCMessage {
    // Add server to context
    ctx = context.WithValue(ctx, serverKey{}, s)
    var err *requestError

    // 这里首先考虑了要处理的消息不是 mcp client 的请求,而是 mcp client 的响应,
    // 因为 mcp server 可以主动向 mcp client 发送请求,
    // 比如采样请求或者 ping 请求,对应的响应也是在此函数处理的,
    // 故首先考虑了这种特殊情况,先基于 result 的通用结构反序列化消息

    var baseMessage struct {
       JSONRPC string        `json:"jsonrpc"`
       Method  mcp.MCPMethod `json:"method"`
       ID      any           `json:"id,omitempty"`
       Result  any           `json:"result,omitempty"`
    }

    if err := json.Unmarshal(message, &baseMessage); err != nil {
       return createErrorResponse(
          nil,
          mcp.PARSE_ERROR,
          "Failed to parse message",
       )
    }

    // JSON-RPC 必须使用 2.0 版本
    if baseMessage.JSONRPC != mcp.JSONRPC_VERSION {
       return createErrorResponse(
          baseMessage.ID,
          mcp.INVALID_REQUEST,
          "Invalid JSON-RPC version",
       )
    }

    // 不存在 id 时,说明是客户端主动发送的通知,
    // 比如客户端初始化完成的通知消息 initialized notification
    if baseMessage.ID == nil {
       var notification mcp.JSONRPCNotification
       if err := json.Unmarshal(message, &notification); err != nil {
          return createErrorResponse(
             nil,
             mcp.PARSE_ERROR,
             "Failed to parse notification",
          )
       }
       s.handleNotification(ctx, notification)
       return nil // Return nil for notifications
    }

    // 如果存在 result,在此处处理,但是这里什么也没做,
    // 这是因为这里只差 ping 一种情况没有处理,
    // 客户端针对于 ping 的响应本身就是空,所以什么也不用做。
    // 那如果是服务端采样请求的响应呢?
    // 那个响应已经在传输层的代码处理过了,即 StreamableHTTPServer.handleSamplingResponse
    // 故这里就都不用考虑了
    if baseMessage.Result != nil {
       // this is a response to a request sent by the server (e.g. from a ping
       // sent due to WithKeepAlive option)
       return nil
    }

    // 走到这里说明 mcp client 发送的消息不是响应,而是请求,
    // 进入请求处理部分,接下来的代码就比较符合后端日常处理请求返回响应的逻辑了

    handleErr := s.hooks.onRequestInitialization(ctx, baseMessage.ID, message)
    if handleErr != nil {
       return createErrorResponse(
          baseMessage.ID,
          mcp.INVALID_REQUEST,
          handleErr.Error(),
       )
    }

    // Get request header from ctx
    h := ctx.Value(requestHeader)
    headers, ok := h.(http.Header)

    if headers == nil || !ok {
       headers = make(http.Header)
    }

    // 按照不同的 method 处理请求
    switch baseMessage.Method {
    case mcp.MethodInitialize:
       var request mcp.InitializeRequest
       var result *mcp.InitializeResult
       if unmarshalErr := json.Unmarshal(message, &request); unmarshalErr != nil {
          err = &requestError{
             id:   baseMessage.ID,
             code: mcp.INVALID_REQUEST,
             err:  &UnparsableMessageError{message: message, err: unmarshalErr, method: baseMessage.Method},
          }
       } else {
          request.Header = headers
          s.hooks.beforeInitialize(ctx, baseMessage.ID, &request)
          result, err = s.handleInitialize(ctx, baseMessage.ID, request)
       }
       if err != nil {
          s.hooks.onError(ctx, baseMessage.ID, baseMessage.Method, &request, err)
          return err.ToJSONRPCError()
       }
       s.hooks.afterInitialize(ctx, baseMessage.ID, &request, result)
       return createResponse(baseMessage.ID, *result)

    // 略去处理更多 method 的逻辑
    ...


    default:
       return createErrorResponse(
          baseMessage.ID,
          mcp.METHOD_NOT_FOUND,
          fmt.Sprintf("Method %s not found", baseMessage.Method),
       )
    }
}

HandleMessage 方法后面处理请求并返回响应的代码是很好理解的,针对于不同 method 进入不同的处理分支并返回响应,switch 之前的逻辑会显得有点奇怪,因为mcp支持服务端主动向客户端发送请求并接收响应,故 HandleMessage 最上面先判断了一下得到的消息是不是客户端返回的响应,那 mcp server 到底是如何向 mcp client 主动发送请求或者通知呢?其实是在 StreamableHTTPServer.handleGet 方法,我们刚刚一直在看 handlePost 后续的流程,其实客户端在初始化完成之后会使用 GET 方法访问 /mcp,代码如下,这里是 client 调用的 Start 方法:

// Start initiates the HTTP connection to the server.
func (c *StreamableHTTP) Start(ctx context.Context) error {
    // For Streamable HTTP, we don't need to establish a persistent connection by default
    if c.getListeningEnabled {
       go func() {
          select {
          case <-c.initialized:
             ctx, cancel := c.contextAwareOfClientClose(ctx)
             defer cancel()
             c.listenForever(ctx)
          case <-c.closed:
             return
          }
       }()
    }

    return nil
}

listenForever 会调用 createGETConnectionToServer

func (c *StreamableHTTP) createGETConnectionToServer(ctx context.Context) error {
    // 发送 GET 请求
    resp, err := c.sendHTTP(ctx, http.MethodGet, nil, "text/event-stream")
    if err != nil {
       return fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    // Check if we got an error response
    if resp.StatusCode == http.StatusMethodNotAllowed {
       return ErrGetMethodNotAllowed
    }

    if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
       body, _ := io.ReadAll(resp.Body)
       return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, body)
    }

    // handle SSE response
    contentType := resp.Header.Get("Content-Type")
    if contentType != "text/event-stream" {
       return fmt.Errorf("unexpected content type: %s", contentType)
    }

    // 处理 sse 响应
    _, err = c.handleSSEResponse(ctx, resp.Body, true)
    if err != nil {
       return fmt.Errorf("failed to handle SSE response: %w", err)
    }

    return nil
}

可以看到这段逻辑就会用 GET 方法请求 /mcp,之后就一直监听 mcp server 返回的数据,此时要求响应体 content-type 必须为 text/event-stream,这里相当于把 HTTP 请求所使用的那个连接变成了一个长连接,从这个长连接监听服务端主动推送的消息,故要记住,mcp client 使用 POST 方法访问 /mcp 时是为了进行远程调用,使用 GET 方法访问 /mcp 时是为了打开一个 sse 流接收 mcp server 主动推送的消息。不过我们主要还是关注 mcp client 主动调用 mcp server 这种最为普遍的场景,所以让我们回到 MCPServer.HandleMessage 方法,刚刚提到此方法会在 switch 中处理不同 method 的请求,以下就是 method 的全部枚举:

const (
    // MethodInitialize initiates connection and negotiates protocol capabilities.
    // https://*m*odel*contextprotocol.io/specification/2024-11-05/basic/lifecycle/#initialization
    MethodInitialize MCPMethod = "initialize"

    // MethodPing verifies connection liveness between client and server.
    // https://*mo*delc*ontextprotocol.io/specification/2024-11-05/basic/utilities/ping/
    MethodPing MCPMethod = "ping"

    // MethodResourcesList lists all available server resources.
    // https://m**o*delcontextprotocol.io/specification/2024-11-05/server/resources/
    MethodResourcesList MCPMethod = "resources/list"

    // MethodResourcesTemplatesList provides URI templates for constructing resource URIs.
    // https://m**o*delcontextprotocol.io/specification/2024-11-05/server/resources/
    MethodResourcesTemplatesList MCPMethod = "resources/templates/list"

    // MethodResourcesRead retrieves content of a specific resource by URI.
    // https://m**o*delcontextprotocol.io/specification/2024-11-05/server/resources/
    MethodResourcesRead MCPMethod = "resources/read"

    // MethodPromptsList lists all available prompt templates.
    // https://**modelcon*textprotocol.io/specification/2024-11-05/server/prompts/
    MethodPromptsList MCPMethod = "prompts/list"

    // MethodPromptsGet retrieves a specific prompt template with filled parameters.
    // https://**modelcon*textprotocol.io/specification/2024-11-05/server/prompts/
    MethodPromptsGet MCPMethod = "prompts/get"

    // MethodToolsList lists all available executable tools.
    // https://mode*lco**ntextprotocol.io/specification/2024-11-05/server/tools/
    MethodToolsList MCPMethod = "tools/list"

    // MethodToolsCall invokes a specific tool with provided parameters.
    // https://mode*lco**ntextprotocol.io/specification/2024-11-05/server/tools/
    MethodToolsCall MCPMethod = "tools/call"

    // MethodSetLogLevel configures the minimum log level for client
    // https://*mo*d*elcontextprotocol.io/specification/2025-03-26/server/utilities/logging
    MethodSetLogLevel MCPMethod = "logging/setLevel"

    // MethodNotificationResourcesListChanged notifies when the list of available resources changes.
    // https://*modelc*on*textprotocol.io/specification/2025-03-26/server/resources#list-changed-notification
    MethodNotificationResourcesListChanged = "notifications/resources/list_changed"

    MethodNotificationResourceUpdated = "notifications/resources/updated"

    // MethodNotificationPromptsListChanged notifies when the list of available prompt templates changes.
    // https://**m*odelcontextprotocol.io/specification/2025-03-26/server/prompts#list-changed-notification
    MethodNotificationPromptsListChanged = "notifications/prompts/list_changed"

    // MethodNotificationToolsListChanged notifies when the list of available tools changes.
    // https://spec.modelcontext*p*r*otocol.io/specification/2024-11-05/server/tools/list_changed/
    MethodNotificationToolsListChanged = "notifications/tools/list_changed"
)

这些方法可以认为就是 mcp 所能提供的全部能力了,方法整体可以分为以下几类:

生命周期相关:

initialize 是 mcp client 必须最先调用的方法,此方法用于协商 server 和 client 双方的版本和支持的能力,比如客户端是否支持采样请求、服务端是否支持推送工具列表变化的通知,此方法调用完成之后,mcp client 会再发送一个通知 notifications/initialized 用以表明客户端已准备好开始工作。

ping 方法用于验证对方是否存活,mcp server 和 mcp client 都可以向对方发送 ping 请求,若对方存活,将会返回一个空的响应,如果长时间对方都没有响应,则可以认为对方已经出现问题了。

资源相关:

资源就是 mcp server 共享给 mcp client 的一些数据,比如一个文件,每个资源都有一个 URI 标识,比如:file:///project/src/main.rs,有了资源之后, mcp server 就需要暴露出一些方法用于 mcp client 访问这些资源,具体包括如下方法:

resources/list 返回 mcp server 包含的资源列表,支持分页返回,具体如下:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "resources": [
      {
        "uri": "file:///project/src/main.rs",
        "name": "main.rs",
        "title": "Rust Software Application Main File",
        "description": "Primary application entry point",
        "mimeType": "text/x-rust"
      }
    ],
    "nextCursor": "next-page-cursor"
  }
}

resources/read 通过资源的 URI 读取资源的内容,具体如下:

mcp client 请求:

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "resources/read",
  "params": {
    "uri": "file:///project/src/main.rs"
  }
}

mcp server 响应:

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "contents": [
      {
        "uri": "file:///project/src/main.rs",
        "mimeType": "text/x-rust",
        "text": "fn main() {n    println!("Hello world!");n}"
      }
    ]
  }
}

resources/templates/list 提供可参数化的资源(类似 file:///{path} 占位符)列表,具体如下:

{
  "jsonrpc": "2.0",
  "id": 3,
  "result": {
    "resourceTemplates": [
      {
        "uriTemplate": "file:///{path}",
        "name": "Project Files",
        "description": "Access files in the project directory",
        "mimeType": "application/octet-stream"
      }
    ]
  }
}

resources/subscribe 用于监听单个资源是否变化,当资源发生变化时,服务端会推送 notifications/resources/updated;如果资源列表有变化,比如新增了资源,则推送 notifications/resources/list_changed。值得注意的是,虽然 mcp 的资源接口支持二进制数据(会用 base64 编码),但不要直接把大文件 base64 后发给大模型——通用大语言模型根本无法理解或分析这些原始二进制数据,而且这样会浪费大量 token。更合理的做法是由专门程序处理二进制数据,大模型上下文中只保留资源的 URI,需要时再让工具获取 URI 对应的具体数据。

提示词相关:

这里所说的提示词就是发给大模型的提示词,只是 mcp 使用一种标准化的方式将提示词模板都管理起来了,提示词是非常重要的,如果我们不微调大模型,就只能通过调整提示词来影响大模型的回复,故使用 mcp 集中管理这些提示词,而不要直接在代码里面写死提示词。

prompts/list 方法会返回 mcp server 包含的全部提示词模板列表,具体如下:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "prompts": [
      {
        "name": "code_review",
        "title": "Request Code Review",
        "description": "Asks the LLM to analyze code quality and suggest improvements",
        "arguments": [
          {
            "name": "code",
            "description": "The code to review",
            "required": true
          }
        ]
      }
    ],
    "nextCursor": "next-page-cursor"
  }
}

包含这个模板的作用以及需要给这个模板传递的参数。

使用 prompts/get 方法并传入模板参数,则可以得到渲染后的完整提示词,将这个提示词发给大模型就可以得到大模型的回复了,具体如下:

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "prompts/get",
  "params": {
    "name": "code_review",
    "arguments": {
      "code": "def hello():n    print('world')"
    }
  }
}

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "description": "Code review prompt",
    "messages": [
      {
        "role": "user",
        "content": {
          "type": "text",
          "text": "Please review this Python code:ndef hello():n    print('world')"
        }
      }
    ]
  }
}

可以看到这里使用 code_review 模板,并传入 code 参数,最终得到了一段完整的 review 代码的提示词。

mcp server 的提示词列表发生变化,可以使用 notifications/prompts/list_changed 方法推送提示词列表变化通知。

工具相关:

mcp 最典型的、最常用的功能就是提供工具,我们之前的示例代码也都是基于提供工具进行演示的,这里就不再解释了,就是大模型通过 mcp client 调用 mcp server 提供的工具来完成一些特定的功能,工具部分具体包含以下方法:

tools/list 获取 mcp server 包含的工具列表,支持分页查询(使用的是游标分页),返回工具的名称、描述、参数列表,示例如下:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [
      {
        "name": "get_weather",
        "title": "Weather Information Provider",
        "description": "Get current weather information for a location",
        "inputSchema": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "City name or zip code"
            }
          },
          "required": ["location"]
        }
      }
    ],
    "nextCursor": "next-page-cursor"
  }
}

claude 展示某个 mcp server 提供了哪些工具时应该就是调用的这个方法。

tools/call 具体调用某个工具,这里就是一个典型的 rpc 调用了,通过 tools/list 方法中获得的工具名称以及工具参数定义调用工具,示例如下:

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/call",
  "params": {
    "name": "get_weather",
    "arguments": {
      "location": "New York"
    }
  }
}

调用之后可以得到此工具的调用结果:

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Current weather in New York:nTemperature: 72°FnConditions: Partly cloudy"
      }
    ],
    "isError": false
  }
}

工具列表发生变化时,可以使用 notifications/tools/list_changed 推送工具列表变化通知。

上面简单介绍了一下 mcp 中规定的各个 method 的作用,如果想看更多细节,可以直接在上述定义的常量上面的链接中查看,如 https://mode*lco**ntextprotocol.io/specification/2024-11-05/server/tools/,都是官方文档中的内容。我们接下来看段源码,因为通过 mcp 让大模型调用工具是最常见的用法,我们就看一下 tools/call 是如何实现的:

func (s *MCPServer) handleToolCall(
    ctx context.Context,
    id any,
    request mcp.CallToolRequest,
) (*mcp.CallToolResult, *requestError) {
    // 略去了一些代码
    ...

    if !ok {
       s.toolsMu.RLock()
       // 根据请求参数的名称自 tools 属性中取出要执行的工具
       tool, ok = s.tools[request.Params.Name]
       s.toolsMu.RUnlock()
    }

    if !ok {
       return nil, &requestError{
          id:   id,
          code: mcp.INVALID_PARAMS,
          err:  fmt.Errorf("tool '%s' not found: %w", request.Params.Name, ErrToolNotFound),
       }
    }

    finalHandler := tool.Handler

    s.middlewareMu.RLock()
    mw := s.toolHandlerMiddlewares

    // 构建一个工具调用的中间件结构,由最后一个中间件开始有内到外包裹 tool.Handler
    for i := len(mw) - 1; i >= 0; i-- {
       finalHandler = mw[i](finalHandler)
    }
    s.middlewareMu.RUnlock()

    // 执行工具得到响应
    result, err := finalHandler(ctx, request)
    if err != nil {
       return nil, &requestError{
          id:   id,
          code: mcp.INTERNAL_ERROR,
          err:  err,
       }
    }

    return result, nil
}

在 mcp server 添加一个新的工具时,就是添加到这里的 s.tools 属性之中

func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) {
    s.AddTools(ServerTool{Tool: tool, Handler: handler})
}


func (s *MCPServer) AddTools(tools ...ServerTool) {
    s.implicitlyRegisterToolCapabilities()

    s.toolsMu.Lock()
    for _, entry := range tools {
       s.tools[entry.Tool.Name] = entry
    }
    s.toolsMu.Unlock()

    // When the list of available tools changes, servers that declared the listChanged capability SHOULD send a notification.
    if s.capabilities.tools.listChanged {
       // Send notification to all initialized sessions
       s.SendNotificationToAllClients(mcp.MethodNotificationToolsListChanged, nil)
    }
}

这里就只放一段 handleToolCall 的源码,其他分支逻辑都差不多,就是针对于不同的 method 调用不同的 handle 方法,如果想查看其他的 handle 方法,直接看 MCPServer.HandleMessage 方法就可以了。

授权认证:

在前面的章节中,我们已经完整梳理了 mcp client 与 mcp server 的通信全流程。接下来,需要重点关注授权,这是 mcp 协议必须解决的核心问题。

假设我们希望一个大模型帮助管理我们的 github 仓库,首先就必须对大模型进行授权。否则,大模型不可能在没有用户明确许可的情况下访问敏感数据,而 github 的 mcp server 也需要确保用户完成授权后,才能从 access_token 中获取用户的个人信息,从而返回用户的代码数据。随着 mcp 协议的推广,越来越多的软件厂商将提供 mcp 接口,这些接口都必须有严谨的授权逻辑,以保证用户数据的隐私和安全。

目前,mcp 官方对于使用 stdio 作为传输层的授权机制尚未给出明确规定。因为 mcp server 本身就是运行在本地的,只要用户从官方渠道安装,一般不会存在安全问题。而针对 Streamable HTTP 作为传输层的场景,官方推荐使用 oauth2 作为授权协议,oauth2 已经经过大量实践验证,非常成熟,并且与当前场景高度契合。 通过 oauth2,用户就可以在不需要向大模型暴露账号密码的前提下,为应用授权访问特定资源。

关于 oauth2 的详细细节,不是本文的重点,可以参考 ,这篇文章详细地说明了 oauth2 授权的整体流程。 对 oauth2 的授权流程都搞清楚之后,我们基于 mcp-go 实际写一段示例代码,授权服务器就直接使用 gitee 了,不用自己搭建省事一些,这里是 gitee 的 , 按照文档自己创建一个应用,应用中包含 Client ID、Client Secret,同时添加好应用回调地址,这些都准备好之后,可以看下以下的示例代码,这段代码对 mcp-go examples 中 oauth 的示例代码做了简化,去掉了动态注册 client 的逻辑,便于快速理解整体流程:

mcp server 部分就还用上面那段 server 的代码就行,这里主要是为了演示,并没有实际验证 token:

func main() {
    // mcp 服务器的名称为 demo,版本为 1.0
    mcpSrv := server.NewMCPServer("demo", "1.0")

    // 添加一个名为 toUpper 的工具,作用是将输入文本转为大写
    // 此工具只有一个参数:text,含义为需要转换的文本
    tool := mcp.NewTool(
       "toUpper",
       mcp.WithDescription("将输入文本转换为大写"),
       mcp.WithString("text", mcp.Description("需要转换的文本"), mcp.Required()),
    )

    // 指定工具的执行逻辑,核心使用 strings.ToUpper 转为大写
    mcpSrv.AddTool(tool, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
       text, err := req.RequireString("text")
       if err != nil {
          return nil, err
       }
       textUpper := fmt.Sprintf("文本转为大写后为: %s", strings.ToUpper(text))
       return mcp.NewToolResultText(textUpper), nil
    })

    // 使用 streamable http 协议启动 mcp 服务器
    httpSrv := server.NewStreamableHTTPServer(mcpSrv)
    if err := httpSrv.Start(":8080"); err != nil {
       panic(err)
    }
}

mcp client 部分在原有的基础上增加了 oauth 授权相关,代码相对多了一些:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os/exec"
    "runtime"

    "github.com/mark3labs/mcp-go/client"
    "github.com/mark3labs/mcp-go/client/transport"
    "github.com/mark3labs/mcp-go/mcp"
)

func main() {
    // oauth 配置
    oauthConfig := client.OAuthConfig{
       ClientID:              "gitee 应用中的 Client ID",
       ClientSecret:          "gitee 应用中的 Client Secret",
       RedirectURI:           "http://localho**st*:8085/oauth/callback",
       TokenStore:            client.NewMemoryTokenStore(),
       AuthServerMetadataURL: "http://loc*alho*st*:8085/oauth/metadata",
    }

    // 创建基于 oauth 授权的 mcp client
    mcpClient, err := client.NewOAuthStreamableHttpClient("http://lo*calh*os*t:8080/mcp", oauthConfig)
    if err != nil {
       log.Fatal(err)
    }
    ctx := context.Background()
    if err = mcpClient.Start(ctx); err != nil {
       maybeAuthorize(err)
       if err = mcpClient.Start(ctx); err != nil {
          log.Fatal(err)
       }
    }
    defer mcpClient.Close()

    initReq := mcp.InitializeRequest{
       Params: mcp.InitializeParams{
          ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
          ClientInfo: mcp.Implementation{
             Name:    "http-client",
             Version: "1.0.0",
          },
       },
    }
    initRes, err := mcpClient.Initialize(ctx, initReq)
    if err != nil {
       log.Println("首次初始化失败: ", err)
       maybeAuthorize(err)
       if initRes, err = mcpClient.Initialize(ctx, initReq); err != nil {
          log.Fatal(err)
       }
    }
    log.Println("初始化成功, ", "server info: ", initRes.ServerInfo)
}

func maybeAuthorize(err error) {
    // 判断 err 是否为需要进行授权认证
    if client.IsOAuthAuthorizationRequiredError(err) {
       log.Println("需要进行 oauth 授权,开始执行授权流程")

       // 自 err 中获取 oauth handler,这个 handler 其实是在 
       // NewOAuthStreamableHttpClient 中通过 OAuthConfig 创建的
       oauthHandler := client.GetOAuthHandler(err)

       // 本地启动一个 http 服务器,用以接收 oauth 授权之后的回调,
       // 同时这里接口也返回了 gitee 授权页面的 url 和获取 token 的 url
       callbackChan := make(chan map[string]string)
       server := startCallbackServer(callbackChan)
       defer server.Close()

       // 获取授权页面 url,用户需要在这个页面进行授权操作
       authURL, err := oauthHandler.GetAuthorizationURL(context.Background(), "uuid", "")
       if err != nil {
          log.Fatalf("Failed to get authorization URL: %v", err)
       }
       log.Println("获取到授权页面url:", authURL)

       // 在浏览器中打开授权页面的 url
       log.Printf("打开浏览器: %sn", authURL)
       openBrowser(authURL)

       log.Println("已在浏览器打开授权url,等待用户完成授权...")
       params := <-callbackChan

       // 授权完成之后,授权页面会主动跳转至回调 url,并在 query 中添加授权码 code
       code := params["code"]
       if code == "" {
          log.Fatalf("No authorization code received")
       }
       log.Println("用户完成授权,已得到授权码: ", code)

       log.Println("使用授权码获取并保存token")
       err = oauthHandler.ProcessAuthorizationResponse(context.Background(), code, "uuid", "")
       if err != nil {
          log.Fatalf("Failed to process authorization response: %v", err)
       }
       log.Println("授权成功")
    }
}

func startCallbackServer(callbackChan chan<- map[string]string) *http.Server {
    server := &http.Server{
       Addr: ":8085",
    }

    // 此接口用于接收 gitee 的回调,这个接口需要添加到 gitee 授权应用的回调地址中
    http.HandleFunc("/oauth/callback", func(w http.ResponseWriter, r *http.Request) {
       // 解析 query 参数,用户授权完成后,在跳转至本页面时会在 query 参数中添加授权码 code
       params := make(map[string]string)
       for key, values := range r.URL.Query() {
          if len(values) > 0 {
             params[key] = values[0]
          }
       }

       callbackChan <- params

       // 以下为一个简单的授权成功页面
       w.Header().Set("Content-Type", "text/html")
       _, err := w.Write([]byte(`
          <html>
             <body>
                <h1>Authorization Successful</h1>
                <p>You can now close this window and return to the application.</p>

             </body>
          </html>
       `))
       if err != nil {
          log.Printf("Error writing response: %v", err)
       }
    })

    // 此接口是为了获取授权服务器的信息,比如这里返回了 gitee 授权页面和获取 token 的 url
    // 这样 oauthHandler.GetAuthorizationURL 才能获取到授权页面的 url
    http.HandleFunc("/oauth/metadata", func(w http.ResponseWriter, r *http.Request) {
       w.Header().Set("Content-Type", "application/json")
       metadata := transport.AuthServerMetadata{
          Issuer:                "https://gite*e*.*com",
          AuthorizationEndpoint: "https://gite*e*.*com/oauth/authorize",
          TokenEndpoint:         "https://gite*e*.*com/oauth/token",
       }
       data, _ := json.Marshal(metadata)
       w.Write(data)
    })

    go func() {
       if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
          log.Printf("HTTP server error: %v", err)
       }
    }()

    return server
}

func openBrowser(url string) {
    var err error

    switch runtime.GOOS {
    case "linux":
       err = exec.Command("xdg-open", url).Start()
    case "windows":
       err = exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start()
    case "darwin":
       err = exec.Command("open", url).Start()
    default:
       err = fmt.Errorf("unsupported platform")
    }

    if err != nil {
       log.Printf("Failed to open browser: %v", err)
       fmt.Printf("Please open the following URL in your browser: %sn", url)
    }
}


// output:
// main.go:53: 首次初始化失败:  transport error: failed to send request: no valid token available, authorization required
// main.go:65: 需要进行 oauth 授权,开始执行授权流程
// main.go:80: 获取到授权页面url: https://gite*e*.*com/oauth/authorize?client_id=xxx&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2Foauth%2Fcallback&response_type=code&state=uuid
// main.go:83: 打开浏览器: https://gite*e*.*com/oauth/authorize?client_id=xxx&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2Foauth%2Fcallback&response_type=code&state=uuid
// main.go:86: 已在浏览器打开授权url,等待用户完成授权...
// main.go:93: 用户完成授权,已得到授权码:  f4ecc77f8c973327820fa7ef65d43b7cb645d71a576f08a5a80df3faac583cda
// main.go:95: 使用授权码获取并保存token
// main.go:100: 授权成功
// main.go:59: 初始化成功,  server info:  {demo 1.0}

首先我们需要通过 NewOAuthStreamableHttpClient 来创建带有 oauth 授权功能的 mcp client,这里通过 OAuthConfig 指定了 gitee 中 Client ID、Client Secret、回调地址,gitee 的授权页面 url 以及获取 token 的 url 则是通过 AuthServerMetadataURL 参数指定的接口返回的。在首次调用 Initialize 方法时,mcp client 存在 oauth handler 但是却无法获取 token ,故会返回 err 要求进行 oauth 授权:

    // Add OAuth authorization if configured
    if c.oauthHandler != nil {
       authHeader, err := c.oauthHandler.GetAuthorizationHeader(ctx)
       if err != nil {
          // If we get an authorization error, return a specific error that can be handled by the client
          if err.Error() == "no valid token available, authorization required" {
             return nil, &OAuthAuthorizationRequiredError{
                Handler: c.oauthHandler,
             }
          }
          return nil, fmt.Errorf("failed to get authorization header: %w", err)
       }
       req.Header.Set("Authorization", authHeader)
    }

这是 StreamableHTTP.sendHTTP 方法中的一部分,可以看到在获取包含 token 的 header失败时返回了 OAuthAuthorizationRequiredError 错误,表明需要进行 oauth 认证。之后在 maybeAuthorize 中,通过 client.IsOAuthAuthorizationRequiredError(err) 判断是不是这个错误,如果是这个错误则进入 oauth 认证的流程。

之后通过 oauthHandler.GetAuthorizationURL 获取授权页面的 url,我们指定了 AuthServerMetadataURLhttp://loc*alho*st*:8085/oauth/metadata,故会访问我们自己本地启动的 http 服务器的 /oauth/metadata 接口,这个接口返回了 gitee 的授权页面 url 还有通过授权码获取 token 的 url,之后我们用浏览器打开了这个授权页面的 url,这时浏览器里应该是登录 gitee 并授权的页面,点击授权完成之后,页面会自动跳转到我们指定的回调地址,也就是 http://localho**st*:8085/oauth/callback,此时页面应该是如下所示的一个简单的授权成功页面:

image.png

同时我们的程序就可以拿到回调 url query 参数中包含的授权码 code 了,之后调用 oauthHandler.ProcessAuthorizationResponse 就可以使用这个 code 换取 access_token 了, 我们可以大体看下这个方法:

func (h *OAuthHandler) ProcessAuthorizationResponse(ctx context.Context, code, state, codeVerifier string) error {
    ...

    metadata, err := h.getServerMetadata(ctx)
    if err != nil {
       return fmt.Errorf("failed to get server metadata: %w", err)
    }

    // 发送获取 token 的请求
    data := url.Values{}
    data.Set("grant_type", "authorization_code")

    // 这里的 code 就是刚刚通过授权页面得到的
    data.Set("code", code)
    data.Set("client_id", h.config.ClientID)
    data.Set("redirect_uri", h.config.RedirectURI)

    if h.config.ClientSecret != "" {
       data.Set("client_secret", h.config.ClientSecret)
    }

    if h.config.PKCEEnabled && codeVerifier != "" {
       data.Set("code_verifier", codeVerifier)
    }

    req, err := http.NewRequestWithContext(
       ctx,
       http.MethodPost,
       metadata.TokenEndpoint,
       strings.NewReader(data.Encode()),
    )
    if err != nil {
       return fmt.Errorf("failed to create token request: %w", err)
    }

    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    req.Header.Set("Accept", "application/json")

    resp, err := h.httpClient.Do(req)
    if err != nil {
       return fmt.Errorf("failed to send token request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
       body, _ := io.ReadAll(resp.Body)
       return extractOAuthError(body, resp.StatusCode, "token request failed")
    }

    var tokenResp Token
    if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
       return fmt.Errorf("failed to decode token response: %w", err)
    }

    // Set expiration time
    if tokenResp.ExpiresIn > 0 {
       tokenResp.ExpiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn) * time.Second)
    }

    // 将 token 保存到 token store
    if err := h.config.TokenStore.SaveToken(&tokenResp); err != nil {
       return fmt.Errorf("failed to save token: %w", err)
    }

    return nil
}

这个函数最终会将获取到的 token 保存到 token store,目前的 token store 就是我们在 OAuthConfig 中指定的 client.NewMemoryTokenStore()

之后再次调用 Initialize 方法还是会执行到 StreamableHTTP.sendHTTP 方法,只是此时 GetAuthorizationHeader 是可以执行成功的,核心逻辑如下:

func (h *OAuthHandler) getValidToken(ctx context.Context) (*Token, error) {
    token, err := h.config.TokenStore.GetToken()
    if err == nil && !token.IsExpired() && token.AccessToken != "" {
       return token, nil
    }

    // 尝试使用 refresh token 刷新 access token
    if err == nil && token.RefreshToken != "" {
       newToken, err := h.refreshToken(ctx, token.RefreshToken)
       if err == nil {
          return newToken, nil
       }
       // If refresh fails, continue to authorization flow
    }
    return nil, ErrOAuthAuthorizationRequired
}

可以看到主要就是从 token store 中获取 access_token,如果 access_token 已经过期了就使用 refresh_token 获取新的 access_token,如果 token 刷新也失败了,就只能返回 ErrOAuthAuthorizationRequired 错误,让用户重新进行授权了。

这次成功获取到 token 之后,就可以顺利执行 Initialize 方法了,最终执行成功,打印出了 mcp server 的信息。

到这里,mcp 的来龙去脉就大致清晰了:它是什么,为什么要有它,怎么跑起来,以及里面的各种细节和源码解析。可以看到,mcp 的核心目标就是让大模型不再是“闭门造车”,而是真正能够方便地与外部世界打交道,同时还要保证用户数据的隐私和安全。

无论是 stdio 这种简洁直接的方式,还是 Streamable HTTP 这种更适合云端场景的方式,mcp 都给了我们一个统一的协议框架。未来,随着越来越多的厂商支持 mcp,大模型将可以天然地接入各种上下文和工具,帮我们处理更复杂的任务。所以,mcp 不是“可有可无的规范”,而是大模型生态走向成熟的关键一步,我很期待有一天,只要跟手机里的智能助理说一声,它就能帮我处理生活中的大部分事情,我相信那一天一定会到来的。

本站部分内容转载自互联网,如果有网站内容侵犯了您的权益,可直接联系我们删除,感谢支持!