项目目标
我们要开发一个文件处理 MCP Server,让 AI 能够:
- 解析 PDF 文件,提取文本内容
- 读取 Excel 文件,获取表格数据
- 处理 CSV 文件,支持查询和统计
- 转换文件格式(CSV ↔ JSON)
这在日常工作中非常实用——你可以让 AI 直接分析报表、提取合同信息、处理数据文件。
项目初始化
mkdir mcp-file-server
cd mcp-file-server
npm init -y
npm install @modelcontextprotocol/sdk zod
npm install pdf-parse xlsx csv-parse csv-stringify
npm install -D typescript @types/node
项目结构:
src/
├── index.ts # 入口
├── server.ts # MCP Server 定义
├── parsers/
│ ├── pdf-parser.ts # PDF 解析
│ ├── excel-parser.ts # Excel 解析
│ └── csv-parser.ts # CSV 解析
└── utils/
└── file-utils.ts # 文件工具函数
文件工具函数
// src/utils/file-utils.ts
import { readFile, stat, access } from "fs/promises";
import { extname, resolve } from "path";
// 允许访问的目录(安全限制)
let allowedDirs: string[] = [];
export function setAllowedDirs(dirs: string[]): void {
allowedDirs = dirs.map((d) => resolve(d));
}
export function validatePath(filePath: string): string {
const resolved = resolve(filePath);
if (allowedDirs.length > 0) {
const isAllowed = allowedDirs.some((dir) => resolved.startsWith(dir));
if (!isAllowed) {
throw new Error(
`Access denied: ${filePath} is outside allowed directories`
);
}
}
return resolved;
}
export async function fileExists(filePath: string): Promise<boolean> {
try {
await access(filePath);
return true;
} catch {
return false;
}
}
export async function getFileInfo(filePath: string) {
const resolved = validatePath(filePath);
const stats = await stat(resolved);
const ext = extname(resolved).toLowerCase();
return {
path: resolved,
size: stats.size,
extension: ext,
modifiedAt: stats.mtime,
sizeFormatted: formatFileSize(stats.size),
};
}
export async function readFileBuffer(filePath: string): Promise<Buffer> {
const resolved = validatePath(filePath);
return readFile(resolved);
}
export async function readFileText(filePath: string): Promise<string> {
const resolved = validatePath(filePath);
return readFile(resolved, "utf-8");
}
function formatFileSize(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
}
PDF 解析器
// src/parsers/pdf-parser.ts
import pdfParse from "pdf-parse";
import { readFileBuffer } from "../utils/file-utils.js";
export interface PdfResult {
text: string;
pageCount: number;
info: {
title?: string;
author?: string;
creator?: string;
};
}
export async function parsePdf(
filePath: string,
options?: { maxPages?: number; pageRange?: string }
): Promise<PdfResult> {
const buffer = await readFileBuffer(filePath);
const parseOptions: pdfParse.Options = {};
if (options?.maxPages) {
parseOptions.max = options.maxPages;
}
const data = await pdfParse(buffer, parseOptions);
let text = data.text;
// 如果指定了页面范围,尝试按页分割
if (options?.pageRange) {
const pages = text.split(/\f/); // PDF 页面分隔符
const range = parsePageRange(options.pageRange, pages.length);
text = range.map((i) => pages[i] || "").join("\n\n---\n\n");
}
return {
text: text.trim(),
pageCount: data.numpages,
info: {
title: data.info?.Title,
author: data.info?.Author,
creator: data.info?.Creator,
},
};
}
function parsePageRange(range: string, totalPages: number): number[] {
const pages: number[] = [];
for (const part of range.split(",")) {
const trimmed = part.trim();
if (trimmed.includes("-")) {
const [start, end] = trimmed.split("-").map(Number);
for (let i = start - 1; i < Math.min(end, totalPages); i++) {
pages.push(i);
}
} else {
const page = Number(trimmed) - 1;
if (page >= 0 && page < totalPages) {
pages.push(page);
}
}
}
return pages;
}
Excel 解析器
// src/parsers/excel-parser.ts
import * as XLSX from "xlsx";
import { readFileBuffer } from "../utils/file-utils.js";
export interface ExcelSheet {
name: string;
rowCount: number;
columnCount: number;
headers: string[];
}
export interface ExcelData {
headers: string[];
rows: Record<string, unknown>[];
totalRows: number;
}
export async function listSheets(filePath: string): Promise<ExcelSheet[]> {
const buffer = await readFileBuffer(filePath);
const workbook = XLSX.read(buffer, { type: "buffer" });
return workbook.SheetNames.map((name) => {
const sheet = workbook.Sheets[name];
const range = XLSX.utils.decode_range(sheet["!ref"] || "A1");
return {
name,
rowCount: range.e.r - range.s.r + 1,
columnCount: range.e.c - range.s.c + 1,
headers: getHeaders(sheet, range),
};
});
}
export async function readSheet(
filePath: string,
sheetName?: string,
options?: { startRow?: number; maxRows?: number }
): Promise<ExcelData> {
const buffer = await readFileBuffer(filePath);
const workbook = XLSX.read(buffer, { type: "buffer" });
const targetSheet = sheetName || workbook.SheetNames[0];
const sheet = workbook.Sheets[targetSheet];
if (!sheet) {
throw new Error(`Sheet "${targetSheet}" not found`);
}
const jsonData = XLSX.utils.sheet_to_json<Record<string, unknown>>(sheet);
const startRow = options?.startRow || 0;
const maxRows = options?.maxRows || 100;
const sliced = jsonData.slice(startRow, startRow + maxRows);
const headers =
sliced.length > 0 ? Object.keys(sliced[0]) : [];
return {
headers,
rows: sliced,
totalRows: jsonData.length,
};
}
function getHeaders(
sheet: XLSX.WorkSheet,
range: XLSX.Range
): string[] {
const headers: string[] = [];
for (let c = range.s.c; c <= range.e.c; c++) {
const cell = sheet[XLSX.utils.encode_cell({ r: range.s.r, c })];
headers.push(cell ? String(cell.v) : `Column${c + 1}`);
}
return headers;
}
CSV 解析器
// src/parsers/csv-parser.ts
import { parse } from "csv-parse/sync";
import { stringify } from "csv-stringify/sync";
import { readFileText } from "../utils/file-utils.js";
export interface CsvData {
headers: string[];
rows: Record<string, string>[];
totalRows: number;
}
export async function parseCsv(
filePath: string,
options?: {
delimiter?: string;
maxRows?: number;
encoding?: BufferEncoding;
}
): Promise<CsvData> {
const content = await readFileText(filePath);
const records = parse(content, {
columns: true,
skip_empty_lines: true,
delimiter: options?.delimiter || ",",
trim: true,
}) as Record<string, string>[];
const maxRows = options?.maxRows || 100;
const sliced = records.slice(0, maxRows);
const headers = sliced.length > 0 ? Object.keys(sliced[0]) : [];
return {
headers,
rows: sliced,
totalRows: records.length,
};
}
export function csvToJson(
rows: Record<string, string>[]
): string {
return JSON.stringify(rows, null, 2);
}
export function jsonToCsv(
data: Record<string, unknown>[]
): string {
if (data.length === 0) return "";
const headers = Object.keys(data[0]);
return stringify(data, {
header: true,
columns: headers,
});
}
export function getCsvStats(
rows: Record<string, string>[],
column: string
): Record<string, unknown> {
const values = rows.map((r) => r[column]).filter(Boolean);
const numericValues = values
.map(Number)
.filter((n) => !isNaN(n));
const stats: Record<string, unknown> = {
column,
totalValues: values.length,
uniqueValues: new Set(values).size,
emptyValues: rows.length - values.length,
};
if (numericValues.length > 0) {
const sorted = numericValues.sort((a, b) => a - b);
stats.min = sorted[0];
stats.max = sorted[sorted.length - 1];
stats.mean =
numericValues.reduce((a, b) => a + b, 0) / numericValues.length;
stats.median =
sorted.length % 2 === 0
? (sorted[sorted.length / 2 - 1] + sorted[sorted.length / 2]) / 2
: sorted[Math.floor(sorted.length / 2)];
}
return stats;
}
MCP Server 实现
// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import { parsePdf } from "./parsers/pdf-parser.js";
import { listSheets, readSheet } from "./parsers/excel-parser.js";
import { parseCsv, csvToJson, getCsvStats } from "./parsers/csv-parser.js";
import { getFileInfo } from "./utils/file-utils.js";
export function createServer(): McpServer {
const server = new McpServer({
name: "mcp-file-server",
version: "1.0.0",
});
// 工具:获取文件信息
server.tool(
"file_info",
"获取文件的基本信息(大小、类型、修改时间)",
{
path: z.string().describe("文件的绝对路径"),
},
async ({ path }) => {
try {
const info = await getFileInfo(path);
return {
content: [
{
type: "text",
text:
`文件信息:\n` +
`- 路径:${info.path}\n` +
`- 大小:${info.sizeFormatted}\n` +
`- 类型:${info.extension}\n` +
`- 修改时间:${info.modifiedAt.toLocaleString()}`,
},
],
};
} catch (error) {
return {
content: [{ type: "text", text: `获取文件信息失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:解析 PDF
server.tool(
"read_pdf",
"解析 PDF 文件,提取文本内容。支持指定页面范围",
{
path: z.string().describe("PDF 文件的绝对路径"),
max_pages: z.number().optional().describe("最多解析的页数"),
page_range: z
.string()
.optional()
.describe("页面范围,如 '1-3' 或 '1,3,5'"),
},
async ({ path, max_pages, page_range }) => {
try {
const result = await parsePdf(path, {
maxPages: max_pages,
pageRange: page_range,
});
let text = `PDF 文件解析结果:\n`;
text += `- 总页数:${result.pageCount}\n`;
if (result.info.title) text += `- 标题:${result.info.title}\n`;
if (result.info.author) text += `- 作者:${result.info.author}\n`;
text += `\n---\n\n${result.text}`;
// 如果内容太长,截断并提示
if (text.length > 50000) {
text = text.substring(0, 50000) + "\n\n[内容已截断,请使用 page_range 参数指定页面范围]";
}
return { content: [{ type: "text", text }] };
} catch (error) {
return {
content: [{ type: "text", text: `PDF 解析失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:列出 Excel 工作表
server.tool(
"list_excel_sheets",
"列出 Excel 文件中的所有工作表及其基本信息",
{
path: z.string().describe("Excel 文件的绝对路径(.xlsx 或 .xls)"),
},
async ({ path }) => {
try {
const sheets = await listSheets(path);
const sheetList = sheets
.map(
(s) =>
`### ${s.name}\n- 行数:${s.rowCount}\n- 列数:${s.columnCount}\n- 列名:${s.headers.join(", ")}`
)
.join("\n\n");
return {
content: [
{
type: "text",
text: `Excel 文件包含 ${sheets.length} 个工作表:\n\n${sheetList}`,
},
],
};
} catch (error) {
return {
content: [{ type: "text", text: `Excel 解析失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:读取 Excel 数据
server.tool(
"read_excel",
"读取 Excel 工作表的数据,返回表格格式",
{
path: z.string().describe("Excel 文件的绝对路径"),
sheet_name: z.string().optional().describe("工作表名称,默认第一个"),
start_row: z.number().optional().default(0).describe("起始行号(0 开始)"),
max_rows: z.number().optional().default(50).describe("最大返回行数"),
},
async ({ path, sheet_name, start_row, max_rows }) => {
try {
const data = await readSheet(path, sheet_name, {
startRow: start_row,
maxRows: max_rows,
});
if (data.rows.length === 0) {
return {
content: [{ type: "text", text: "工作表中没有数据。" }],
};
}
// 格式化为 Markdown 表格
const header = `| ${data.headers.join(" | ")} |`;
const separator = `| ${data.headers.map(() => "---").join(" | ")} |`;
const rows = data.rows
.map(
(row) =>
`| ${data.headers.map((h) => String(row[h] ?? "")).join(" | ")} |`
)
.join("\n");
return {
content: [
{
type: "text",
text:
`数据(共 ${data.totalRows} 行,显示 ${data.rows.length} 行):\n\n` +
`${header}\n${separator}\n${rows}`,
},
],
};
} catch (error) {
return {
content: [{ type: "text", text: `读取失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:解析 CSV
server.tool(
"read_csv",
"解析 CSV 文件,返回表格数据",
{
path: z.string().describe("CSV 文件的绝对路径"),
delimiter: z.string().optional().default(",").describe("分隔符,默认逗号"),
max_rows: z.number().optional().default(50).describe("最大返回行数"),
},
async ({ path, delimiter, max_rows }) => {
try {
const data = await parseCsv(path, { delimiter, maxRows: max_rows });
if (data.rows.length === 0) {
return {
content: [{ type: "text", text: "CSV 文件中没有数据。" }],
};
}
const header = `| ${data.headers.join(" | ")} |`;
const separator = `| ${data.headers.map(() => "---").join(" | ")} |`;
const rows = data.rows
.map(
(row) =>
`| ${data.headers.map((h) => row[h] ?? "").join(" | ")} |`
)
.join("\n");
return {
content: [
{
type: "text",
text:
`CSV 数据(共 ${data.totalRows} 行,显示 ${data.rows.length} 行):\n\n` +
`${header}\n${separator}\n${rows}`,
},
],
};
} catch (error) {
return {
content: [{ type: "text", text: `CSV 解析失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:CSV 列统计
server.tool(
"csv_column_stats",
"对 CSV 文件的指定列进行统计分析(计数、唯一值、数值统计等)",
{
path: z.string().describe("CSV 文件的绝对路径"),
column: z.string().describe("要统计的列名"),
},
async ({ path, column }) => {
try {
const data = await parseCsv(path, { maxRows: 10000 });
if (!data.headers.includes(column)) {
return {
content: [
{
type: "text",
text: `列 "${column}" 不存在。可用的列:${data.headers.join(", ")}`,
},
],
};
}
const stats = getCsvStats(data.rows, column);
let text = `列 "${column}" 的统计信息:\n`;
text += `- 总值数:${stats.totalValues}\n`;
text += `- 唯一值:${stats.uniqueValues}\n`;
text += `- 空值数:${stats.emptyValues}\n`;
if (stats.min !== undefined) {
text += `\n数值统计:\n`;
text += `- 最小值:${stats.min}\n`;
text += `- 最大值:${stats.max}\n`;
text += `- 平均值:${(stats.mean as number).toFixed(2)}\n`;
text += `- 中位数:${stats.median}\n`;
}
return { content: [{ type: "text", text }] };
} catch (error) {
return {
content: [{ type: "text", text: `统计失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
// 工具:CSV 转 JSON
server.tool(
"csv_to_json",
"将 CSV 文件转换为 JSON 格式",
{
path: z.string().describe("CSV 文件的绝对路径"),
max_rows: z.number().optional().default(100).describe("最大转换行数"),
},
async ({ path, max_rows }) => {
try {
const data = await parseCsv(path, { maxRows: max_rows });
const json = csvToJson(data.rows);
return {
content: [
{
type: "text",
text: `转换结果(${data.rows.length} 条记录):\n\n\`\`\`json\n${json}\n\`\`\``,
},
],
};
} catch (error) {
return {
content: [{ type: "text", text: `转换失败:${(error as Error).message}` }],
isError: true,
};
}
}
);
return server;
}
入口文件
// src/index.ts
#!/usr/bin/env node
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { setAllowedDirs } from "./utils/file-utils.js";
import { createServer } from "./server.js";
async function main() {
// 从命令行参数获取允许访问的目录
const allowedDirs = process.argv.slice(2);
if (allowedDirs.length === 0) {
console.error("Usage: mcp-file-server <dir1> [dir2] ...");
console.error("Specify directories that the server is allowed to access");
process.exit(1);
}
setAllowedDirs(allowedDirs);
const server = createServer();
const transport = new StdioServerTransport();
await server.connect(transport);
}
main().catch((error) => {
console.error("Server failed to start:", error);
process.exit(1);
});
安全考虑
文件处理 Server 的安全性尤为重要:
1. 目录白名单
只允许访问指定目录下的文件,防止越权访问:
{
"mcpServers": {
"files": {
"command": "node",
"args": [
"/path/to/mcp-file-server/dist/index.js",
"/Users/me/documents",
"/Users/me/downloads"
]
}
}
}
2. 文件大小限制
避免处理过大的文件导致内存溢出:
const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB
const info = await getFileInfo(path);
if (info.size > MAX_FILE_SIZE) {
throw new Error(`File too large: ${info.sizeFormatted} (max 50MB)`);
}
3. 只读操作
这个 Server 只提供读取和分析功能,不提供写入或删除操作。
4. 路径遍历防护
// validatePath 函数中已经使用 resolve() 处理了路径
// 防止 ../../../etc/passwd 这类攻击
const resolved = resolve(filePath);
使用场景
配置好 Server 后,可以这样使用:
用户:帮我分析 /Users/me/documents/sales-report.xlsx 的销售数据
AI:我来读取这个 Excel 文件...
[调用 list_excel_sheets]
文件包含 3 个工作表:Q1, Q2, Q3
[调用 read_excel,sheet_name="Q1"]
Q1 数据如下...
根据数据分析,Q1 的总销售额为 xxx,环比增长 xx%...
总结
文件处理 MCP Server 是一个非常实用的工具,它让 AI 能够直接处理我们日常工作中的各种文件。核心要点:
- 每种文件格式用独立的解析器处理,保持代码清晰
- 安全性是第一优先级:目录白名单、文件大小限制、只读操作
- 返回格式化的数据(Markdown 表格),让 AI 更容易理解
- 大文件要做截断处理,避免超出上下文限制
数据不应该被锁在文件里。一个好的文件处理工具,让 AI 成为你的数据分析助手。
相关文章
评论
加载中...
评论
加载中...