项目目标
我们要开发一个数据库 MCP Server,让 AI 能够:
- 列出数据库中的所有表
- 查看表结构(列名、类型、约束)
- 执行 SQL 查询(只读)
- 获取表的数据统计信息
最终效果:在 Claude Code 中直接用自然语言查询数据库。
用户:帮我查一下最近 7 天注册的用户数量
AI:我来查询数据库...
[调用 query 工具]
最近 7 天共注册了 142 位用户。
项目初始化
创建项目
mkdir mcp-database-server
cd mcp-database-server
npm init -y
安装依赖
# MCP SDK
npm install @modelcontextprotocol/sdk
# 数据库驱动
npm install pg # PostgreSQL
npm install better-sqlite3 # SQLite(用于本地测试)
# 开发依赖
npm install -D typescript @types/node @types/pg
npm install -D @types/better-sqlite3
TypeScript 配置
// tsconfig.json
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"declaration": true
},
"include": ["src/**/*"]
}
package.json 配置
{
"name": "mcp-database-server",
"version": "1.0.0",
"type": "module",
"bin": {
"mcp-database-server": "./dist/index.js"
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"start": "node dist/index.js"
}
}
项目结构
src/
├── index.ts # 入口文件,启动 Server
├── server.ts # MCP Server 定义
├── database.ts # 数据库连接和查询
├── tools.ts # 工具定义
└── types.ts # 类型定义
实现数据库连接层
先实现数据库操作的封装:
// src/database.ts
import pg from "pg";
export interface TableInfo {
name: string;
schema: string;
rowCount: number;
}
export interface ColumnInfo {
name: string;
type: string;
nullable: boolean;
defaultValue: string | null;
isPrimaryKey: boolean;
}
export interface QueryResult {
columns: string[];
rows: Record<string, unknown>[];
rowCount: number;
}
export class DatabaseConnection {
private pool: pg.Pool;
constructor(connectionString: string) {
this.pool = new pg.Pool({
connectionString,
max: 5,
idleTimeoutMillis: 30000,
});
}
async listTables(): Promise<TableInfo[]> {
const result = await this.pool.query(`
SELECT
t.table_name as name,
t.table_schema as schema,
COALESCE(s.n_live_tup, 0) as row_count
FROM information_schema.tables t
LEFT JOIN pg_stat_user_tables s
ON t.table_name = s.relname
WHERE t.table_schema = 'public'
AND t.table_type = 'BASE TABLE'
ORDER BY t.table_name
`);
return result.rows.map((row) => ({
name: row.name,
schema: row.schema,
rowCount: Number(row.row_count),
}));
}
async describeTable(tableName: string): Promise<ColumnInfo[]> {
// 防止 SQL 注入:验证表名
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
throw new Error(`Invalid table name: ${tableName}`);
}
const result = await this.pool.query(
`
SELECT
c.column_name as name,
c.data_type as type,
c.is_nullable = 'YES' as nullable,
c.column_default as default_value,
COALESCE(
(SELECT true FROM information_schema.key_column_usage k
JOIN information_schema.table_constraints tc
ON k.constraint_name = tc.constraint_name
WHERE k.column_name = c.column_name
AND k.table_name = c.table_name
AND tc.constraint_type = 'PRIMARY KEY'
LIMIT 1),
false
) as is_primary_key
FROM information_schema.columns c
WHERE c.table_name = $1
AND c.table_schema = 'public'
ORDER BY c.ordinal_position
`,
[tableName]
);
return result.rows.map((row) => ({
name: row.name,
type: row.type,
nullable: row.nullable,
defaultValue: row.default_value,
isPrimaryKey: row.is_primary_key,
}));
}
async query(sql: string, limit: number = 100): Promise<QueryResult> {
// 安全检查:只允许 SELECT 语句
const trimmed = sql.trim().toUpperCase();
if (!trimmed.startsWith("SELECT") && !trimmed.startsWith("WITH")) {
throw new Error("Only SELECT and WITH (CTE) queries are allowed");
}
// 禁止危险操作
const forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"];
for (const keyword of forbidden) {
if (trimmed.includes(keyword)) {
throw new Error(`Forbidden keyword detected: ${keyword}`);
}
}
// 添加 LIMIT 限制
const limitedSql = sql.replace(/;?\s*$/, "") + ` LIMIT ${limit}`;
const result = await this.pool.query(limitedSql);
return {
columns: result.fields.map((f) => f.name),
rows: result.rows,
rowCount: result.rowCount ?? 0,
};
}
async getTableStats(tableName: string): Promise<Record<string, unknown>> {
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
throw new Error(`Invalid table name: ${tableName}`);
}
const countResult = await this.pool.query(
`SELECT COUNT(*) as total FROM "${tableName}"`
);
const sizeResult = await this.pool.query(
`SELECT pg_size_pretty(pg_total_relation_size($1)) as size`,
[tableName]
);
return {
tableName,
totalRows: Number(countResult.rows[0].total),
totalSize: sizeResult.rows[0].size,
};
}
async close(): Promise<void> {
await this.pool.end();
}
}
实现 MCP Server
工具定义
// src/tools.ts
import { z } from "zod";
export const toolSchemas = {
listTables: {
name: "list_tables",
description: "列出数据库中所有的表,包含表名和大致行数",
schema: {},
},
describeTable: {
name: "describe_table",
description: "查看指定表的结构,包含列名、数据类型、是否可空、默认值、是否主键",
schema: {
table_name: z.string().describe("要查看的表名"),
},
},
query: {
name: "query",
description:
"执行只读 SQL 查询(仅支持 SELECT 语句),返回查询结果",
schema: {
sql: z.string().describe("要执行的 SQL 查询语句(仅支持 SELECT)"),
limit: z
.number()
.optional()
.default(100)
.describe("返回结果的最大行数,默认 100"),
},
},
tableStats: {
name: "table_stats",
description: "获取指定表的统计信息,包含总行数和占用空间",
schema: {
table_name: z.string().describe("要查看统计信息的表名"),
},
},
};
Server 主体
// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { DatabaseConnection } from "./database.js";
import { toolSchemas } from "./tools.js";
export function createServer(db: DatabaseConnection): McpServer {
const server = new McpServer({
name: "mcp-database-server",
version: "1.0.0",
});
// 工具:列出所有表
server.tool(
toolSchemas.listTables.name,
toolSchemas.listTables.description,
toolSchemas.listTables.schema,
async () => {
try {
const tables = await db.listTables();
if (tables.length === 0) {
return {
content: [{ type: "text", text: "数据库中没有找到任何表。" }],
};
}
const tableList = tables
.map((t) => `- ${t.name} (约 ${t.rowCount} 行)`)
.join("\n");
return {
content: [
{
type: "text",
text: `数据库中共有 ${tables.length} 张表:\n\n${tableList}`,
},
],
};
} catch (error) {
return {
content: [
{ type: "text", text: `查询失败:${(error as Error).message}` },
],
isError: true,
};
}
}
);
// 工具:查看表结构
server.tool(
toolSchemas.describeTable.name,
toolSchemas.describeTable.description,
toolSchemas.describeTable.schema,
async ({ table_name }) => {
try {
const columns = await db.describeTable(table_name);
if (columns.length === 0) {
return {
content: [{ type: "text", text: `表 ${table_name} 不存在或没有列。` }],
};
}
const header = "| 列名 | 类型 | 可空 | 默认值 | 主键 |";
const separator = "|------|------|------|--------|------|";
const rows = columns
.map(
(c) =>
`| ${c.name} | ${c.type} | ${c.nullable ? "是" : "否"} | ${c.defaultValue ?? "-"} | ${c.isPrimaryKey ? "是" : "-"} |`
)
.join("\n");
return {
content: [
{
type: "text",
text: `表 ${table_name} 的结构:\n\n${header}\n${separator}\n${rows}`,
},
],
};
} catch (error) {
return {
content: [
{ type: "text", text: `查询失败:${(error as Error).message}` },
],
isError: true,
};
}
}
);
// 工具:执行 SQL 查询
server.tool(
toolSchemas.query.name,
toolSchemas.query.description,
toolSchemas.query.schema,
async ({ sql, limit }) => {
try {
const result = await db.query(sql, limit);
if (result.rows.length === 0) {
return {
content: [{ type: "text", text: "查询没有返回任何结果。" }],
};
}
// 格式化为 Markdown 表格
const header = `| ${result.columns.join(" | ")} |`;
const separator = `| ${result.columns.map(() => "---").join(" | ")} |`;
const rows = result.rows
.map(
(row) =>
`| ${result.columns.map((col) => String(row[col] ?? "NULL")).join(" | ")} |`
)
.join("\n");
return {
content: [
{
type: "text",
text: `查询返回 ${result.rowCount} 条结果:\n\n${header}\n${separator}\n${rows}`,
},
],
};
} catch (error) {
return {
content: [
{ type: "text", text: `查询失败:${(error as Error).message}` },
],
isError: true,
};
}
}
);
// 工具:表统计信息
server.tool(
toolSchemas.tableStats.name,
toolSchemas.tableStats.description,
toolSchemas.tableStats.schema,
async ({ table_name }) => {
try {
const stats = await db.getTableStats(table_name);
return {
content: [
{
type: "text",
text: `表 ${table_name} 的统计信息:\n- 总行数:${stats.totalRows}\n- 占用空间:${stats.totalSize}`,
},
],
};
} catch (error) {
return {
content: [
{ type: "text", text: `查询失败:${(error as Error).message}` },
],
isError: true,
};
}
}
);
return server;
}
入口文件
// src/index.ts
#!/usr/bin/env node
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { DatabaseConnection } from "./database.js";
import { createServer } from "./server.js";
async function main() {
// 从命令行参数或环境变量获取连接字符串
const connectionString =
process.argv[2] || process.env.DATABASE_URL;
if (!connectionString) {
console.error(
"Usage: mcp-database-server <connection-string>"
);
console.error(
" or set DATABASE_URL environment variable"
);
process.exit(1);
}
// 创建数据库连接
const db = new DatabaseConnection(connectionString);
// 创建 MCP Server
const server = createServer(db);
// 使用 stdio 传输
const transport = new StdioServerTransport();
await server.connect(transport);
// 优雅退出
process.on("SIGINT", async () => {
await db.close();
process.exit(0);
});
}
main().catch((error) => {
console.error("Server failed to start:", error);
process.exit(1);
});
SQL 安全处理
数据库 MCP Server 的安全性至关重要。以下是我们采取的安全措施:
1. 只读限制
// 只允许 SELECT 和 WITH 语句
const trimmed = sql.trim().toUpperCase();
if (!trimmed.startsWith("SELECT") && !trimmed.startsWith("WITH")) {
throw new Error("Only SELECT queries are allowed");
}
2. 关键词过滤
const forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE"];
for (const keyword of forbidden) {
if (trimmed.includes(keyword)) {
throw new Error(`Forbidden keyword: ${keyword}`);
}
}
3. 结果限制
// 强制添加 LIMIT,防止返回过多数据
const limitedSql = sql + ` LIMIT ${limit}`;
4. 表名验证
// 防止 SQL 注入
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
throw new Error(`Invalid table name: ${tableName}`);
}
5. 数据库用户权限
最重要的一层防护:使用只读数据库用户。
-- 创建只读用户
CREATE USER mcp_readonly WITH PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE mydb TO mcp_readonly;
GRANT USAGE ON SCHEMA public TO mcp_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mcp_readonly;
配置和使用
在 Claude Code 中配置
{
"mcpServers": {
"database": {
"command": "node",
"args": [
"/path/to/mcp-database-server/dist/index.js",
"postgresql://mcp_readonly:password@localhost:5432/mydb"
]
}
}
}
使用环境变量
{
"mcpServers": {
"database": {
"command": "node",
"args": ["/path/to/mcp-database-server/dist/index.js"],
"env": {
"DATABASE_URL": "postgresql://mcp_readonly:password@localhost:5432/mydb"
}
}
}
}
测试
使用 MCP Inspector 测试
# 构建项目
npm run build
# 使用 Inspector 测试
npx @modelcontextprotocol/inspector node dist/index.js \
"postgresql://user:pass@localhost:5432/testdb"
在 Inspector 界面中可以:
- 查看注册的 4 个工具
- 手动调用每个工具
- 验证返回结果格式
手动测试
# 直接运行 Server,通过 stdin 发送请求
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
node dist/index.js "postgresql://user:pass@localhost:5432/testdb"
扩展思路
这个基础版本可以继续扩展:
- 查询缓存:对相同查询结果进行缓存,减少数据库压力
- 查询历史:记录执行过的查询,方便回溯
- 多数据库支持:同时连接多个数据库
- 数据可视化:返回图表友好的数据格式
- 慢查询告警:对执行时间过长的查询给出提示
总结
通过这个项目,我们完成了一个实用的数据库 MCP Server。核心要点:
- 使用
@modelcontextprotocol/sdk简化 Server 开发 - 工具定义要清晰,description 决定了 AI 何时使用这个工具
- 安全性是数据库 Server 的重中之重,多层防护不可少
- 返回格式化的结果(Markdown 表格)让 AI 更容易理解和展示
让 AI 理解你的数据,从一个好的 MCP Server 开始。安全第一,功能第二。
相关文章
评论
加载中...
评论
加载中...