好的,以下是优化后的完整方案,直接发送给 Hermes Agent 即可执行。
CmdCode 向量记忆 + FTS5 双路搜索召回系统(生产级优化版)
一、总体目标
为 CmdCode CLI 构建对话记忆系统,支持:
基于 FTS5 的关键词全文搜索
基于 sqlite-vec + 火山引擎 Embedding 的语义向量搜索
双路结果通过 RRF 融合排序
新消息实时自动向量化,历史消息定时回填
查询向量内存缓存,降低 API 成本
高并发控制、安全查询、失败重试与降级策略
二、技术栈与依赖
| 组件 | 说明 |
|------------|-----------------------------------|
| 运行时 | Node.js 18+ |
| 语言 | TypeScript |
| 数据库 | SQLite (better-sqlite3) |
| 向量扩展 | sqlite-vec (加载 vec0 虚拟表) |
| 嵌入模型 | doubao-embedding-vision (2048维) |
| API 端点 |
https://ark.cn-beijing.volces.com/api/coding/v3/embeddings |
| 环境变量 |
ARK_API_KEY,
CMDCODE_DB_PATH |
npm 依赖:
{
"better-sqlite3": "^11.0.0",
"sqlite-vec": "^0.1.0",
"dotenv": "^16.4.0"
}
三、项目文件结构
src/
├── memory/
│ ├── database.ts // 数据库连接与表初始化
│ ├── embedding.ts // Embedding API 封装、队列、缓存
│ ├── sessionStore.ts // 会话与消息 CRUD、FTS5 搜索
│ ├── vectorSearch.ts // 向量相似度搜索(KNN)
│ ├── backfill.ts // 增量回填定时任务
│ ├── rrf.ts // RRF 融合算法
│ ├── utils.ts // packEmbedding, sha256, sanitizeFTS5Query 等
│ └── memoryManager.ts // 统一对外接口
└── ...
四、详细实现
4.1 数据库初始化(database.ts)
import Database from 'better-sqlite3';
import { loadSqliteVec } from 'sqlite-vec';
import dotenv from 'dotenv';
dotenv.config();
const DB_PATH = process.env.CMDCODE_DB_PATH || 'data/cmdcode.db';
export const db = new Database(DB_PATH);
loadSqliteVec(db);
db.pragma('journal_mode = WAL');
export function initializeDatabase() {
db.exec(`
-- 会话表
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
title TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
-- 消息表
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(id),
role TEXT NOT NULL CHECK(role IN ('user','assistant','system','tool')),
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
-- FTS5 全文索引
CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(
content,
content='messages',
content_rowid='id'
);
-- 自动同步触发器
CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
END;
CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content);
END;
CREATE TRIGGER IF NOT EXISTS messages_au AFTER UPDATE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content);
INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
END;
-- 向量虚拟表
CREATE VIRTUAL TABLE IF NOT EXISTS message_embeddings USING vec0(
embedding float[2048] distance_metric=cosine,
+msg_id integer,
+text_hash text,
+created_at text
);
-- 消息删除时清理向量
CREATE TRIGGER IF NOT EXISTS messages_ad_delete_embedding AFTER DELETE ON messages BEGIN
DELETE FROM message_embeddings WHERE msg_id = old.id;
END;
-- 更新会话时间
CREATE TRIGGER IF NOT EXISTS session_updated_after_insert AFTER INSERT ON messages BEGIN
UPDATE sessions SET updated_at = datetime('now') WHERE id = new.session_id;
END;
-- 失败记录表(回填用)
CREATE TABLE IF NOT EXISTS embedding_failures (
msg_id INTEGER PRIMARY KEY,
fail_count INTEGER DEFAULT 1,
last_error TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
`);
}
4.2 工具函数(utils.ts)
import crypto from 'crypto';
export function packEmbedding(vec: number[]): Buffer {
const buf = Buffer.allocUnsafe(vec.length * 4);
for (let i = 0; i < vec.length; i++) {
buf.writeFloatLE(vec[i], i * 4);
}
return buf;
}
export function unpackEmbedding(blob: Buffer): number[] {
const vec: number[] = [];
for (let i = 0; i < blob.length; i += 4) {
vec.push(blob.readFloatLE(i));
}
return vec;
}
export function sha256(text: string): string {
return crypto.createHash('sha256').update(text).digest('hex');
}
export function sanitizeFTS5Query(input: string): string {
let cleaned = input.replace(/[^\w\u4e00-\u9fff\s@.-]/g, ' ').trim();
if (!cleaned) return '*';
const words = cleaned.split(/\s+/).filter(w => w.length > 0);
if (words.length === 0) return '*';
return words.slice(0, 10).map(w => `"${w}"`).join(' OR ');
}
4.3 Embedding 服务、队列与缓存(embedding.ts)
import { sha256 } from './utils';
class EmbeddingService {
private apiKey = process.env.ARK_API_KEY;
private baseURL = 'https://ark.cn-beijing.volces.com/api/coding/v3/embeddings';
async getBatchEmbeddings(texts: string[]): Promise<number[][]> {
let attempt = 0;
const maxRetries = 2;
while (attempt <= maxRetries) {
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 30_000);
const res = await fetch(this.baseURL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.apiKey}`,
},
body: JSON.stringify({
model: 'doubao-embedding-vision',
input: texts,
}),
signal: controller.signal,
});
clearTimeout(timeout);
if (!res.ok) {
const body = await res.text();
throw new Error(`Embedding API error ${res.status}: ${body}`);
}
const json = await res.json();
return json.data.map((d: any) => d.embedding);
} catch (e) {
attempt++;
if (attempt > maxRetries) throw e;
await new Promise(r => setTimeout(r, 1000 * attempt));
}
}
throw new Error('unreachable');
}
async getEmbedding(text: string): Promise<number[]> {
const results = await this.getBatchEmbeddings([text]);
return results[0];
}
}
export const embeddingService = new EmbeddingService();
// 并发请求队列
export class EmbeddingQueue {
private queue: Array<{
text: string;
resolve: (v: number[]) => void;
reject: (e: any) => void;
}> = [];
private processing = false;
private maxConcurrency = 5;
private intervalMs = 200;
enqueue(text: string): Promise<number[]> {
return new Promise((resolve, reject) => {
this.queue.push({ text, resolve, reject });
this.processQueue();
});
}
private async processQueue() {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
try {
while (this.queue.length > 0) {
const batch = this.queue.splice(0, this.maxConcurrency);
const texts = batch.map(item => item.text);
try {
const embeddings = await embeddingService.getBatchEmbeddings(texts);
batch.forEach((item, i) => item.resolve(embeddings[i]));
} catch (e) {
batch.forEach(item => item.reject(e));
}
if (this.queue.length > 0) {
await new Promise(r => setTimeout(r, this.intervalMs));
}
}
} finally {
this.processing = false;
}
}
}
export const embeddingQueue = new EmbeddingQueue();
// 查询向量缓存
export class QueryCache {
private cache = new Map<string, { vector: number[]; ts: number }>();
private maxSize = 2000;
private ttl = 30 * 60 * 1000; // 30 分钟
get(text: string): number[] | undefined {
const key = sha256(text).substring(0, 16);
const entry = this.cache.get(key);
if (entry && Date.now() - entry.ts < this.ttl) {
return entry.vector;
}
this.cache.delete(key);
return undefined;
}
set(text: string, vector: number[]) {
const key = sha256(text).substring(0, 16);
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, { vector, ts: Date.now() });
}
}
4.4 会话与消息存储(sessionStore.ts)
import { db } from './database';
import { sanitizeFTS5Query } from './utils';
export interface Message {
id: number;
session_id: string;
role: string;
content: string;
created_at: string;
}
export interface SearchResult {
msgId: number;
sessionId: string;
role: string;
content: string;
score: number;
source?: 'keyword' | 'vector';
fusedScore?: number;
}
export class SessionStore {
createSession(title?: string): string {
const id = crypto.randomUUID();
db.prepare('INSERT INTO sessions (id, title) VALUES (?, ?)').run(id, title || null);
return id;
}
appendMessage(sessionId: string, role: string, content: string): number {
const info = db.prepare('INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?)')
.run(sessionId, role, content);
return Number(info.lastInsertRowid);
}
getMessages(sessionId: string, limit = 50): Message[] {
return db.prepare('SELECT * FROM messages WHERE session_id = ? ORDER BY created_at DESC LIMIT ?')
.all(sessionId, limit) as Message[];
}
searchByKeyword(query: string, limit = 30): SearchResult[] {
const safeQuery = sanitizeFTS5Query(query);
const rows = db.prepare(`
SELECT m.id, m.session_id, m.role, m.content, m.created_at, rank as score
FROM messages_fts
JOIN messages m ON m.id = messages_fts.rowid
WHERE messages_fts MATCH ?
ORDER BY rank
LIMIT ?
`).all(safeQuery, limit) as any[];
return rows.map(r => ({
msgId: r.id,
sessionId: r.session_id,
role: r.role,
content: r.content,
score: r.score,
source: 'keyword' as const,
}));
}
}
4.5 向量搜索(vectorSearch.ts)
import { db } from './database';
import { packEmbedding } from './utils';
import { SearchResult } from './sessionStore';
export function searchByVector(queryVec: number[], topK = 30, minSimilarity = 0.4): SearchResult[] {
const queryBlob = packEmbedding(queryVec);
const rows = db.prepare(`
SELECT
e.msg_id,
m.session_id,
m.role,
m.content,
m.created_at,
distance AS _distance
FROM message_embeddings e
JOIN messages m ON m.id = e.msg_id
WHERE e.embedding MATCH ?
ORDER BY distance
LIMIT ?
`).all(queryBlob, topK) as any[];
return rows
.filter(r => (1 - r._distance) >= minSimilarity)
.map(r => ({
msgId: r.msg_id,
sessionId: r.session_id,
role: r.role,
content: r.content,
score: 1 - r._distance,
source: 'vector' as const,
}));
}
4.6 RRF 融合(rrf.ts)
import { SearchResult } from './sessionStore';
export function rrfFusion(keywordResults: SearchResult[], vectorResults: SearchResult[], k = 60): SearchResult[] {
const scoreMap = new Map<number, { result: SearchResult; score: number }>();
const addScore = (list: SearchResult[]) => {
list.forEach((r, rank) => {
const id = r.msgId;
const contribution = 1 / (k + rank + 1);
if (scoreMap.has(id)) {
scoreMap.get(id)!.score += contribution;
} else {
scoreMap.set(id, { result: r, score: contribution });
}
});
};
addScore(keywordResults);
addScore(vectorResults);
return Array.from(scoreMap.values())
.sort((a, b) => b.score - a.score)
.map(item => ({ ...item.result, fusedScore: item.score }));
}
4.7 回填任务(backfill.ts)
import { db } from './database';
import { embeddingQueue } from './embedding';
import { SessionStore } from './sessionStore';
import { packEmbedding, sha256 } from './utils';
export class BackfillService {
private running = false;
private sessionStore: SessionStore;
constructor(sessionStore: SessionStore) {
this.sessionStore = sessionStore;
}
start(intervalMinutes = 5) {
setInterval(() => this.runBackfill(), intervalMinutes * 60 * 1000);
this.runBackfill(); // 立即首次执行
}
async runBackfill() {
if (this.running) return;
this.running = true;
try {
const unembedded = db.prepare(`
SELECT m.id, m.content FROM messages m
WHERE m.id NOT IN (SELECT msg_id FROM message_embeddings)
AND m.role IN ('user','assistant')
AND length(m.content) > 10
AND m.id NOT IN (SELECT msg_id FROM embedding_failures WHERE fail_count > 3)
LIMIT 1000
`).all() as { id: number; content: string }[];
const insertStmt = db.prepare(`
INSERT OR IGNORE INTO message_embeddings (embedding, msg_id, text_hash, created_at)
VALUES (?, ?, ?, datetime('now'))
`);
for (let i = 0; i < unembedded.length; i++) {
const msg = unembedded[i];
try {
const embedding = await embeddingQueue.enqueue(msg.content);
const hash = sha256(msg.content);
insertStmt.run(packEmbedding(embedding), msg.id, hash);
// 成功则清除失败记录
db.prepare('DELETE FROM embedding_failures WHERE msg_id = ?').run(msg.id);
} catch (e: any) {
console.error(`回填消息 ${msg.id} 失败:`, e.message);
db.prepare(`
INSERT INTO embedding_failures (msg_id, fail_count, last_error)
VALUES (?, 1, ?)
ON CONFLICT(msg_id) DO UPDATE SET
fail_count = fail_count + 1,
last_error = excluded.last_error,
updated_at = datetime('now')
`).run(msg.id, e.message);
}
}
} finally {
this.running = false;
}
}
}
4.8 统一入口 MemoryManager(memoryManager.ts)
import { initializeDatabase } from './database';
import { SessionStore, SearchResult } from './sessionStore';
import { searchByVector } from './vectorSearch';
import { rrfFusion } from './rrf';
import { BackfillService } from './backfill';
import { QueryCache, embeddingQueue } from './embedding';
import { sha256 } from './utils';
export class MemoryManager {
sessionStore: SessionStore;
backfillService: BackfillService;
queryCache: QueryCache;
constructor() {
initializeDatabase();
this.sessionStore = new SessionStore();
this.backfillService = new BackfillService(this.sessionStore);
this.queryCache = new QueryCache();
this.backfillService.start(5); // 每 5 分钟增量回填
}
appendMessage(sessionId: string, role: string, content: string): number {
const msgId = this.sessionStore.appendMessage(sessionId, role, content);
// 对 user/assistant 且长度 > 10 的消息自动生成向量
if (['user', 'assistant'].includes(role) && content.length > 10) {
setImmediate(async () => {
try {
const embedding = await embeddingQueue.enqueue(content);
const hash = sha256(content);
const { db } = require('./database');
const exists = db.prepare('SELECT 1 FROM message_embeddings WHERE text_hash = ?').get(hash);
if (!exists) {
const { packEmbedding } = require('./utils');
db.prepare(`
INSERT INTO message_embeddings (embedding, msg_id, text_hash, created_at)
VALUES (?, ?, ?, datetime('now'))
`).run(packEmbedding(embedding), msgId, hash);
}
} catch (err) {
console.error(`自动向量化消息 ${msgId} 失败:`, err);
}
});
}
return msgId;
}
async search(query: string, limit = 20): Promise<SearchResult[]> {
// 1. FTS5 关键词搜索
const keywordResults = this.sessionStore.searchByKeyword(query, 30);
// 2. 向量语义搜索
let vectorResults: SearchResult[] = [];
try {
let queryVec = this.queryCache.get(query);
if (!queryVec) {
const { embeddingService } = require('./embedding');
queryVec = await embeddingService.getEmbedding(query);
this.queryCache.set(query, queryVec);
}
vectorResults = searchByVector(queryVec, 30);
} catch (e) {
console.warn('向量搜索失败,降级为纯关键词搜索:', e);
}
// 3. RRF 融合
const fused = rrfFusion(keywordResults, vectorResults);
return fused.slice(0, limit);
}
}
五、使用示例
import { MemoryManager } from './memory/memoryManager';
const memory = new MemoryManager();
// 创建会话并追加消息
const sessionId = memory.sessionStore.createSession('系统调优');
memory.appendMessage(sessionId, 'user', '如何清理磁盘空间?');
memory.appendMessage(sessionId, 'assistant', '可以尝试使用 du -sh 查看大文件...');
// 语义搜索
const results = await memory.search('硬盘不够了');
console.log(results);
六、优化清单(对照原方案)
| 优化项 | 涉及模块 |
|--------------------------------------|-------------------|
| 1. 向量搜索改用 sqlite-vec 原生 KNN | vectorSearch.ts |
| 2. 增加 Embedding 请求队列与限流 | embedding.ts |
| 3. FTS5 查询特殊字符转义 | utils.ts |
| 4. 批量 Embedding + 重试 + 超时 | embedding.ts |
| 5. 消息删除清理向量、会话时间更新 | database.ts |
| 6. 回填失败记录与跳过 | backfill.ts |
| 7. 环境变量管理 | database.ts |
| 8. 统一服务入口 MemoryManager | memoryManager.ts |
本方案可直接交付 Hermes Agent,所有代码模块清晰、无外部黑盒依赖,可直接部署在 CmdCode 项目中。