专业编程基础技术教程

网站首页 > 基础教程 正文

多种编译语言手搓MCP服务

ccvgpt 2025-05-23 15:41:19 基础教程 14 ℃

使用Python3.12创建你的第一个mcp服务

第一步 安装构建环境

安装python3.12

多种编译语言手搓MCP服务

第二步 创建python 虚拟环境(避免依赖包相互影响)

按虚拟环境工具UV: pip install uv

进入实操

# 创建目录
uv init mymcp
#进去目录
cd mymcp
#创建虚拟python环境  # 指定Python版本,注意需要对应版本的Python已经安装
#uv venv -p 3.12
# --python 同 -p
#uv venv --python 3.12
# 激活虚拟环境
# Unix
source .venv/bin/activate
# Windows
.venv\Scripts\activate
# 安装依赖包
uv add mcp[cli] httpx -i https://pypi.tuna.tsinghua.edu.cn/simple
# 创建一个py文件,名字自取
new-item weather.py

weather.py

from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
async def make_nws_request(url: str) -> dict[str, Any] | None:
    """Make a request to the NWS API with proper error handling."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None

def format_alert(feature: dict) -> str:
    """Format an alert feature into a readable string."""
    props = feature["properties"]
    return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""
@mcp.tool()
async def get_alerts(state: str) -> str:
    """Get weather alerts for a US state.

    Args:
        state: Two-letter US state code (e.g. CA, NY)
    """
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)

    if not data or "features" not in data:
        return "Unable to fetch alerts or no alerts found."

    if not data["features"]:
        return "No active alerts for this state."

    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
    """Get weather forecast for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location
    """
    # First get the forecast grid endpoint
    points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
    points_data = await make_nws_request(points_url)

    if not points_data:
        return "Unable to fetch forecast data for this location."

    # Get the forecast URL from the points response
    forecast_url = points_data["properties"]["forecast"]
    forecast_data = await make_nws_request(forecast_url)

    if not forecast_data:
        return "Unable to fetch detailed forecast."

    # Format the periods into a readable forecast
    periods = forecast_data["properties"]["periods"]
    forecasts = []
    for period in periods[:5]:  # Only show next 5 periods
        forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
        forecasts.append(forecast)

    return "\n---\n".join(forecasts)

if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')

本地调试下

mcp dev weather.py

打开浏览器 http://localhost:6274


可以验证一下工具是否正常工作。

添加mcp服务到你的cursor 或者Claude desktop

{
    "mcpServers": {
        "weather": {
            "command": "uv",
            "args": [
                "--directory",
                "/ABSOLUTE/PATH/TO/PARENT/FOLDER/weather",
                "run",
                "weather.py"
            ]
        }
    }
}

如果你是在wsl内部开发的程序,需要使用下面方式添加到cursor或者其他ai工具

{
    "mcpServers": { 
            "weather": {
                        "command": "wsl.exe",
                        "args": [
                    "/home/你的用户/.local/bin/uv",
                    "--directory",
                    "/ABSOLUTE/PATH/TO/PARENT/FOLDER/weather",
                    "run",
                    "weather.py"
                  ]
        }
    }
}


使用Nodejs20创建你的第一个mcp服务

第一步 安装构建环境

安装nodejs, Node.js — Download Node.js(R),按步骤操作即可

第二步,初始化node项目

创建目录mymcp进入目录 执行

 npm init -y # 初始化项目 生成package.json 等文件
# 安装必要的依赖
npm install @modelcontextprotocol/sdk
npm install zod
# 创建js文件 server.js

server.js

import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

// Create an MCP server
const server = new McpServer({
  name: "MyMCP",
  version: "1.0.0"
});

// Add an addition tool
server.tool("add",
  { a: z.number(), b: z.number() },
  async ({ a, b }) => ({
    content: [{ type: "text", text: String(a + b) }]
  })
);

// Add a dynamic greeting resource
server.resource(
  "greeting",
  new ResourceTemplate("greeting://{name}", { list: undefined }),
  async (uri, { name }) => ({
    contents: [{
      uri: uri.href,
      text: `Hello, ${name}!`
    }]
  })
);

// Start receiving messages on stdin and sending messages on stdout
const transport = new StdioServerTransport();
await server.connect(transport);

运行测试

npx @modelcontextprotocol/inspector node server.js


添加mcp服务到你的cursor 或者Claude desktop

{
    "mcpServers": {
        "nodeserver": {
            "command": "node",
            "args": [
                "/ABSOLUTE/PATH/TO/PARENT/FOLDER/server.js"
            ]
        }
    }
}

使用Golang1.23创建你的第一个mcp服务

第一步 安装构建环境

安装golang1.23(为什么不选1.24, 个人建议编程选择依赖版本,最好往前一个版本,避免成为新版本的小白鼠,如果有致命问题再升级到最新)

下载地址:https://go.dev/dl/

创建目录 go-mcp-postgres

创建main.go

package main

import (
	"context"
	"database/sql"
	"errors"
	"flag"
	"fmt"
	"log"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/joho/godotenv"
	_ "github.com/lib/pq"
	"github.com/mark3labs/mcp-go/mcp"
	"github.com/mark3labs/mcp-go/server"
)

var (
	db         *sql.DB
	selectStmt = regexp.MustCompile(`(?i)^\s*SELECT`)
)

func init() {
	if _, err := os.Stat(".env"); err == nil {
		if err := godotenv.Load(); err != nil {
			log.Fatal("Error loading .env file:", err)
		}
	}
}

var (
	host     string
	port     string
	name     string
	user     string
	password string
	sslmode  string
)

func main() {
	// 初始化数据库连接池

	flag.StringVar(&host, "host", "", "POSTGRES HOST")
	flag.StringVar(&port, "port", "", "POSTGRES PORT")
	flag.StringVar(&name, "name", "", "POSTGRES NAME")
	flag.StringVar(&user, "user", "", "POSTGRES USER")
	flag.StringVar(&password, "password", "", "POSTGRES PASSWORD")
	flag.StringVar(&sslmode, "sslmode", "", "POSTGRES SSLMODE")
	flag.Parse()

	dbconfig := PDBCONNECTION{
		Host:     host,
		Port:     port,
		Name:     name,
		User:     user,
		Password: password,
		SSLMODE:  sslmode,
	}
	if err := initConnectionPool(dbconfig); err != nil {
		log.Fatal("Database connection failed:", err)
	}
	defer db.Close()

	s := server.NewMCPServer(
		"postgresql-mcp-server ",
		"1.0.0",
		server.WithResourceCapabilities(true, true),
		server.WithPromptCapabilities(true),
		server.WithLogging(),
	)

	s.AddTool(createReadQueryTool(), readQueryToolHandler)
	s.AddTool(createListTablesTool(), listTableToolHandler)
	s.AddTool(createDescribeTableTool(), describeTableToolHandler)

	if err := server.ServeStdio(s); err != nil {
		log.Printf("Server error: %v\n", err)
	}
}

func createDescribeTableTool() mcp.Tool {
	return mcp.NewTool("postgres_describe_table",
		mcp.WithDescription("Describe a table in the postgres database, query the table structure in the postgres database, 描述一个表在postgres数据库中,查询一个表的结构在postgres数据库中"),
		mcp.WithString("table_name",
			mcp.Required(),
			mcp.Description("The table name to describe, 要描述的表名"),
		),
	)
}

func createReadQueryTool() mcp.Tool {
	return mcp.NewTool("postgres_execute_query",
		mcp.WithDescription("Execute a SELECT query on the postgres database, 执行一个SELECT查询在postgres数据库上"),
		mcp.WithString("query",
			mcp.Required(),
			mcp.Description("SELECT SQL query to execute, 执行一个SELECT查询"),
		),
	)
}

func createListTablesTool() mcp.Tool {
	return mcp.NewTool("postgres_list_tables",
		mcp.WithDescription("List all user tables in the database, 列出数据库中的所有用户表"),
		mcp.WithString("table_name",
			mcp.Description("Optional table name to filter tables, 可选的表名来过滤表"),
		),
	)
}

type PDBCONNECTION struct {
	Host     string
	Port     string
	Name     string
	User     string
	Password string
	SSLMODE  string
}

func initConnectionPool(dbconfig PDBCONNECTION) error {
	port, err := strconv.Atoi(dbconfig.Port)
	if err != nil {
		return fmt.Errorf("invalid DB_PORT: %w", err)
	}

	connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
		dbconfig.Host,
		port,
		dbconfig.User,
		dbconfig.Password,
		dbconfig.Name,
		dbconfig.SSLMODE,
	)

	db, err = sql.Open("postgres", connStr)
	if err != nil {
		return fmt.Errorf("database connection failed: %w", err)
	}

	db.SetMaxOpenConns(25)
	db.SetMaxIdleConns(5)
	db.SetConnMaxLifetime(5 * time.Minute)
	db.SetConnMaxIdleTime(2 * time.Minute)

	if err = db.Ping(); err != nil {
		return fmt.Errorf("database ping failed: %w", err)
	}

	log.Println("Successfully connected to database")
	return nil
}

func readQueryToolHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	query, ok := request.Params.Arguments["query"].(string)
	if !ok {
		return nil, errors.New("invalid query parameter")
	}

	if !selectStmt.MatchString(query) {
		return nil, errors.New("only SELECT queries are allowed")
	}

	rows, err := db.QueryContext(ctx, query)
	if err != nil {
		log.Printf("Query error: %v\n", err)
		return nil, fmt.Errorf("query execution failed")
	}
	defer rows.Close()

	results, err := parseSQLRows(rows)
	if err != nil {
		return nil, fmt.Errorf("result parsing failed")
	}

	return mcp.NewToolResultText(fmt.Sprintf("Query results: %v", results)), nil
}

func listTableToolHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	schemaFilter := ""
	if schema, ok := request.Params.Arguments["schema"].(string); ok {
		schemaFilter = fmt.Sprintf(" AND schemaname = '%s'", sanitizeInput(schema))
	}

	query := fmt.Sprintf(`
		SELECT tablename 
		FROM pg_catalog.pg_tables 
		WHERE schemaname NOT IN ('pg_catalog', 'information_schema') %s
	`, schemaFilter)

	rows, err := db.QueryContext(ctx, query)
	if err != nil {
		log.Printf("List tables error: %v\n", err)
		return nil, fmt.Errorf("failed to list tables")
	}
	defer rows.Close()

	var tables []string
	for rows.Next() {
		var table string
		if err := rows.Scan(&table); err != nil {
			return nil, fmt.Errorf("error scanning table name")
		}
		tables = append(tables, table)
	}

	return mcp.NewToolResultText(fmt.Sprintf("Tables: %v", tables)), nil
}

func describeTableToolHandler(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	table_name, ok := request.Params.Arguments["table_name"].(string)
	if !ok {
		return nil, errors.New("invalid schema parameter")
	}

	query := fmt.Sprintf(`
		SELECT column_name, data_type
		FROM information_schema.columns
		WHERE table_name = '%s'
	`, table_name)

	rows, err := db.QueryContext(ctx, query)
	if err != nil {
		log.Printf("Describe table error: %v\n", err)
		return nil, fmt.Errorf("failed to describe table")
	}
	defer rows.Close()

	var columns []string
	for rows.Next() {
		var column string
		var dataType string
		if err := rows.Scan(&column, &dataType); err != nil {
			return nil, fmt.Errorf("error scanning table columns")
		}
		columns = append(columns, fmt.Sprintf("%s (%s)", column, dataType))
	}

	return mcp.NewToolResultText(fmt.Sprintf("Table columns: %v", columns)), nil
}

func parseSQLRows(rows *sql.Rows) ([]map[string]interface{}, error) {
	cols, err := rows.Columns()
	if err != nil {
		return nil, err
	}

	var results []map[string]interface{}
	for rows.Next() {
		values := make([]interface{}, len(cols))
		pointers := make([]interface{}, len(cols))
		for i := range values {
			pointers[i] = &values[i]
		}

		if err := rows.Scan(pointers...); err != nil {
			return nil, err
		}

		row := make(map[string]interface{})
		for i, col := range cols {
			row[col] = values[i]
		}
		results = append(results, row)
	}
	return results, nil
}

func sanitizeInput(input string) string {
	return strings.ReplaceAll(input, "'", "''")
}


进入目录 初始化项目
go mod init go-mcp-postgres
安装golang依赖
go mod tidy
打包项目 你能得到一个go-mcp-postgres.exe
go build

进行简单测试

npx @modelcontextprotocol/inspector go-mcp-postgres.exe --host 10.100.2.1 --port 5432 --name dbname --user loginuser --password loginpassword --sslmode disable



添加mcp服务到你的cursor 或者Claude desktop

{
    "mcpServers": {
        "nodeserver": {
            "command": "/ABSOLUTE/PATH/TO/PARENT/FOLDER/go-mcp-postgres.exe",
            "args": [
                "--host", 
               "10.100.2.1", 
               "--port",  
              "5432", 
               "--name",
              "dbname",
               "--user",
              "loginuser",
               "--password",
              "loginpassword",
               "--sslmode",
               "disable"
            ]
        }
    }
}

Tags:

最近发表
标签列表