👤 用户
你出一个优化后的完整方案,我直接发过去给Hermes执行
💭 思考过程
🤖 DeepSeek
好的,以下是优化后的完整方案,直接发送给 Hermes Agent 即可执行。

CmdCode 向量记忆 + FTS5 双路搜索召回系统(生产级优化版)

一、总体目标

为 CmdCode CLI 构建对话记忆系统,支持:
  • 基于 FTS5 的关键词全文搜索

  • 基于 sqlite-vec + 火山引擎 Embedding 的语义向量搜索

  • 双路结果通过 RRF 融合排序

  • 新消息实时自动向量化,历史消息定时回填

  • 查询向量内存缓存,降低 API 成本

  • 高并发控制、安全查询、失败重试与降级策略

  • 二、技术栈与依赖

    | 组件 | 说明 |
    |------------|-----------------------------------|
    | 运行时 | Node.js 18+ |
    | 语言 | TypeScript |
    | 数据库 | SQLite (better-sqlite3) |
    | 向量扩展 | sqlite-vec (加载 vec0 虚拟表) |
    | 嵌入模型 | doubao-embedding-vision (2048维) |
    | API 端点 | https://ark.cn-beijing.volces.com/api/coding/v3/embeddings |
    | 环境变量 | ARK_API_KEY, CMDCODE_DB_PATH |
    npm 依赖:
    
    {
      "better-sqlite3": "^11.0.0",
      "sqlite-vec": "^0.1.0",
      "dotenv": "^16.4.0"
    }
    

    三、项目文件结构

    
    src/
    ├── memory/
    │   ├── database.ts          // 数据库连接与表初始化
    │   ├── embedding.ts         // Embedding API 封装、队列、缓存
    │   ├── sessionStore.ts      // 会话与消息 CRUD、FTS5 搜索
    │   ├── vectorSearch.ts      // 向量相似度搜索(KNN)
    │   ├── backfill.ts          // 增量回填定时任务
    │   ├── rrf.ts               // RRF 融合算法
    │   ├── utils.ts             // packEmbedding, sha256, sanitizeFTS5Query 等
    │   └── memoryManager.ts     // 统一对外接口
    └── ...
    

    四、详细实现

    4.1 数据库初始化(database.ts)

    
    import Database from 'better-sqlite3';
    import { loadSqliteVec } from 'sqlite-vec';
    import dotenv from 'dotenv';
    
    dotenv.config();
    
    const DB_PATH = process.env.CMDCODE_DB_PATH || 'data/cmdcode.db';
    
    export const db = new Database(DB_PATH);
    loadSqliteVec(db);
    db.pragma('journal_mode = WAL');
    
    export function initializeDatabase() {
      db.exec(`
        -- 会话表
        CREATE TABLE IF NOT EXISTS sessions (
          id TEXT PRIMARY KEY,
          title TEXT,
          created_at TEXT DEFAULT (datetime('now')),
          updated_at TEXT DEFAULT (datetime('now'))
        );
    
        -- 消息表
        CREATE TABLE IF NOT EXISTS messages (
          id INTEGER PRIMARY KEY AUTOINCREMENT,
          session_id TEXT NOT NULL REFERENCES sessions(id),
          role TEXT NOT NULL CHECK(role IN ('user','assistant','system','tool')),
          content TEXT NOT NULL,
          created_at TEXT DEFAULT (datetime('now'))
        );
    
        -- FTS5 全文索引
        CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(
          content,
          content='messages',
          content_rowid='id'
        );
    
        -- 自动同步触发器
        CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN
          INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
        END;
        CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN
          INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content);
        END;
        CREATE TRIGGER IF NOT EXISTS messages_au AFTER UPDATE ON messages BEGIN
          INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content);
          INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
        END;
    
        -- 向量虚拟表
        CREATE VIRTUAL TABLE IF NOT EXISTS message_embeddings USING vec0(
          embedding float[2048] distance_metric=cosine,
          +msg_id integer,
          +text_hash text,
          +created_at text
        );
    
        -- 消息删除时清理向量
        CREATE TRIGGER IF NOT EXISTS messages_ad_delete_embedding AFTER DELETE ON messages BEGIN
          DELETE FROM message_embeddings WHERE msg_id = old.id;
        END;
    
        -- 更新会话时间
        CREATE TRIGGER IF NOT EXISTS session_updated_after_insert AFTER INSERT ON messages BEGIN
          UPDATE sessions SET updated_at = datetime('now') WHERE id = new.session_id;
        END;
    
        -- 失败记录表(回填用)
        CREATE TABLE IF NOT EXISTS embedding_failures (
          msg_id INTEGER PRIMARY KEY,
          fail_count INTEGER DEFAULT 1,
          last_error TEXT,
          updated_at TEXT DEFAULT (datetime('now'))
        );
      `);
    }
    

    4.2 工具函数(utils.ts)

    
    import crypto from 'crypto';
    
    export function packEmbedding(vec: number[]): Buffer {
      const buf = Buffer.allocUnsafe(vec.length * 4);
      for (let i = 0; i < vec.length; i++) {
        buf.writeFloatLE(vec[i], i * 4);
      }
      return buf;
    }
    
    export function unpackEmbedding(blob: Buffer): number[] {
      const vec: number[] = [];
      for (let i = 0; i < blob.length; i += 4) {
        vec.push(blob.readFloatLE(i));
      }
      return vec;
    }
    
    export function sha256(text: string): string {
      return crypto.createHash('sha256').update(text).digest('hex');
    }
    
    export function sanitizeFTS5Query(input: string): string {
      let cleaned = input.replace(/[^\w\u4e00-\u9fff\s@.-]/g, ' ').trim();
      if (!cleaned) return '*';
      const words = cleaned.split(/\s+/).filter(w => w.length > 0);
      if (words.length === 0) return '*';
      return words.slice(0, 10).map(w => `"${w}"`).join(' OR ');
    }
    

    4.3 Embedding 服务、队列与缓存(embedding.ts)

    
    import { sha256 } from './utils';
    
    class EmbeddingService {
      private apiKey = process.env.ARK_API_KEY;
      private baseURL = 'https://ark.cn-beijing.volces.com/api/coding/v3/embeddings';
    
      async getBatchEmbeddings(texts: string[]): Promise<number[][]> {
        let attempt = 0;
        const maxRetries = 2;
        while (attempt <= maxRetries) {
          try {
            const controller = new AbortController();
            const timeout = setTimeout(() => controller.abort(), 30_000);
            const res = await fetch(this.baseURL, {
              method: 'POST',
              headers: {
                'Content-Type': 'application/json',
                Authorization: `Bearer ${this.apiKey}`,
              },
              body: JSON.stringify({
                model: 'doubao-embedding-vision',
                input: texts,
              }),
              signal: controller.signal,
            });
            clearTimeout(timeout);
            if (!res.ok) {
              const body = await res.text();
              throw new Error(`Embedding API error ${res.status}: ${body}`);
            }
            const json = await res.json();
            return json.data.map((d: any) => d.embedding);
          } catch (e) {
            attempt++;
            if (attempt > maxRetries) throw e;
            await new Promise(r => setTimeout(r, 1000 * attempt));
          }
        }
        throw new Error('unreachable');
      }
    
      async getEmbedding(text: string): Promise<number[]> {
        const results = await this.getBatchEmbeddings([text]);
        return results[0];
      }
    }
    
    export const embeddingService = new EmbeddingService();
    
    // 并发请求队列
    export class EmbeddingQueue {
      private queue: Array<{
        text: string;
        resolve: (v: number[]) => void;
        reject: (e: any) => void;
      }> = [];
      private processing = false;
      private maxConcurrency = 5;
      private intervalMs = 200;
    
      enqueue(text: string): Promise<number[]> {
        return new Promise((resolve, reject) => {
          this.queue.push({ text, resolve, reject });
          this.processQueue();
        });
      }
    
      private async processQueue() {
        if (this.processing || this.queue.length === 0) return;
        this.processing = true;
        try {
          while (this.queue.length > 0) {
            const batch = this.queue.splice(0, this.maxConcurrency);
            const texts = batch.map(item => item.text);
            try {
              const embeddings = await embeddingService.getBatchEmbeddings(texts);
              batch.forEach((item, i) => item.resolve(embeddings[i]));
            } catch (e) {
              batch.forEach(item => item.reject(e));
            }
            if (this.queue.length > 0) {
              await new Promise(r => setTimeout(r, this.intervalMs));
            }
          }
        } finally {
          this.processing = false;
        }
      }
    }
    
    export const embeddingQueue = new EmbeddingQueue();
    
    // 查询向量缓存
    export class QueryCache {
      private cache = new Map<string, { vector: number[]; ts: number }>();
      private maxSize = 2000;
      private ttl = 30 * 60 * 1000; // 30 分钟
    
      get(text: string): number[] | undefined {
        const key = sha256(text).substring(0, 16);
        const entry = this.cache.get(key);
        if (entry && Date.now() - entry.ts < this.ttl) {
          return entry.vector;
        }
        this.cache.delete(key);
        return undefined;
      }
    
      set(text: string, vector: number[]) {
        const key = sha256(text).substring(0, 16);
        if (this.cache.size >= this.maxSize) {
          const firstKey = this.cache.keys().next().value;
          this.cache.delete(firstKey);
        }
        this.cache.set(key, { vector, ts: Date.now() });
      }
    }
    

    4.4 会话与消息存储(sessionStore.ts)

    
    import { db } from './database';
    import { sanitizeFTS5Query } from './utils';
    
    export interface Message {
      id: number;
      session_id: string;
      role: string;
      content: string;
      created_at: string;
    }
    
    export interface SearchResult {
      msgId: number;
      sessionId: string;
      role: string;
      content: string;
      score: number;
      source?: 'keyword' | 'vector';
      fusedScore?: number;
    }
    
    export class SessionStore {
      createSession(title?: string): string {
        const id = crypto.randomUUID();
        db.prepare('INSERT INTO sessions (id, title) VALUES (?, ?)').run(id, title || null);
        return id;
      }
    
      appendMessage(sessionId: string, role: string, content: string): number {
        const info = db.prepare('INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?)')
          .run(sessionId, role, content);
        return Number(info.lastInsertRowid);
      }
    
      getMessages(sessionId: string, limit = 50): Message[] {
        return db.prepare('SELECT * FROM messages WHERE session_id = ? ORDER BY created_at DESC LIMIT ?')
          .all(sessionId, limit) as Message[];
      }
    
      searchByKeyword(query: string, limit = 30): SearchResult[] {
        const safeQuery = sanitizeFTS5Query(query);
        const rows = db.prepare(`
          SELECT m.id, m.session_id, m.role, m.content, m.created_at, rank as score
          FROM messages_fts
          JOIN messages m ON m.id = messages_fts.rowid
          WHERE messages_fts MATCH ?
          ORDER BY rank
          LIMIT ?
        `).all(safeQuery, limit) as any[];
    
        return rows.map(r => ({
          msgId: r.id,
          sessionId: r.session_id,
          role: r.role,
          content: r.content,
          score: r.score,
          source: 'keyword' as const,
        }));
      }
    }
    

    4.5 向量搜索(vectorSearch.ts)

    
    import { db } from './database';
    import { packEmbedding } from './utils';
    import { SearchResult } from './sessionStore';
    
    export function searchByVector(queryVec: number[], topK = 30, minSimilarity = 0.4): SearchResult[] {
      const queryBlob = packEmbedding(queryVec);
      const rows = db.prepare(`
        SELECT
          e.msg_id,
          m.session_id,
          m.role,
          m.content,
          m.created_at,
          distance AS _distance
        FROM message_embeddings e
        JOIN messages m ON m.id = e.msg_id
        WHERE e.embedding MATCH ?
        ORDER BY distance
        LIMIT ?
      `).all(queryBlob, topK) as any[];
    
      return rows
        .filter(r => (1 - r._distance) >= minSimilarity)
        .map(r => ({
          msgId: r.msg_id,
          sessionId: r.session_id,
          role: r.role,
          content: r.content,
          score: 1 - r._distance,
          source: 'vector' as const,
        }));
    }
    

    4.6 RRF 融合(rrf.ts)

    
    import { SearchResult } from './sessionStore';
    
    export function rrfFusion(keywordResults: SearchResult[], vectorResults: SearchResult[], k = 60): SearchResult[] {
      const scoreMap = new Map<number, { result: SearchResult; score: number }>();
    
      const addScore = (list: SearchResult[]) => {
        list.forEach((r, rank) => {
          const id = r.msgId;
          const contribution = 1 / (k + rank + 1);
          if (scoreMap.has(id)) {
            scoreMap.get(id)!.score += contribution;
          } else {
            scoreMap.set(id, { result: r, score: contribution });
          }
        });
      };
    
      addScore(keywordResults);
      addScore(vectorResults);
    
      return Array.from(scoreMap.values())
        .sort((a, b) => b.score - a.score)
        .map(item => ({ ...item.result, fusedScore: item.score }));
    }
    

    4.7 回填任务(backfill.ts)

    
    import { db } from './database';
    import { embeddingQueue } from './embedding';
    import { SessionStore } from './sessionStore';
    import { packEmbedding, sha256 } from './utils';
    
    export class BackfillService {
      private running = false;
      private sessionStore: SessionStore;
    
      constructor(sessionStore: SessionStore) {
        this.sessionStore = sessionStore;
      }
    
      start(intervalMinutes = 5) {
        setInterval(() => this.runBackfill(), intervalMinutes * 60 * 1000);
        this.runBackfill(); // 立即首次执行
      }
    
      async runBackfill() {
        if (this.running) return;
        this.running = true;
        try {
          const unembedded = db.prepare(`
            SELECT m.id, m.content FROM messages m
            WHERE m.id NOT IN (SELECT msg_id FROM message_embeddings)
              AND m.role IN ('user','assistant')
              AND length(m.content) > 10
              AND m.id NOT IN (SELECT msg_id FROM embedding_failures WHERE fail_count > 3)
            LIMIT 1000
          `).all() as { id: number; content: string }[];
    
          const insertStmt = db.prepare(`
            INSERT OR IGNORE INTO message_embeddings (embedding, msg_id, text_hash, created_at)
            VALUES (?, ?, ?, datetime('now'))
          `);
    
          for (let i = 0; i < unembedded.length; i++) {
            const msg = unembedded[i];
            try {
              const embedding = await embeddingQueue.enqueue(msg.content);
              const hash = sha256(msg.content);
              insertStmt.run(packEmbedding(embedding), msg.id, hash);
              // 成功则清除失败记录
              db.prepare('DELETE FROM embedding_failures WHERE msg_id = ?').run(msg.id);
            } catch (e: any) {
              console.error(`回填消息 ${msg.id} 失败:`, e.message);
              db.prepare(`
                INSERT INTO embedding_failures (msg_id, fail_count, last_error)
                VALUES (?, 1, ?)
                ON CONFLICT(msg_id) DO UPDATE SET
                  fail_count = fail_count + 1,
                  last_error = excluded.last_error,
                  updated_at = datetime('now')
              `).run(msg.id, e.message);
            }
          }
        } finally {
          this.running = false;
        }
      }
    }
    

    4.8 统一入口 MemoryManager(memoryManager.ts)

    
    import { initializeDatabase } from './database';
    import { SessionStore, SearchResult } from './sessionStore';
    import { searchByVector } from './vectorSearch';
    import { rrfFusion } from './rrf';
    import { BackfillService } from './backfill';
    import { QueryCache, embeddingQueue } from './embedding';
    import { sha256 } from './utils';
    
    export class MemoryManager {
      sessionStore: SessionStore;
      backfillService: BackfillService;
      queryCache: QueryCache;
    
      constructor() {
        initializeDatabase();
        this.sessionStore = new SessionStore();
        this.backfillService = new BackfillService(this.sessionStore);
        this.queryCache = new QueryCache();
        this.backfillService.start(5); // 每 5 分钟增量回填
      }
    
      appendMessage(sessionId: string, role: string, content: string): number {
        const msgId = this.sessionStore.appendMessage(sessionId, role, content);
        // 对 user/assistant 且长度 > 10 的消息自动生成向量
        if (['user', 'assistant'].includes(role) && content.length > 10) {
          setImmediate(async () => {
            try {
              const embedding = await embeddingQueue.enqueue(content);
              const hash = sha256(content);
              const { db } = require('./database');
              const exists = db.prepare('SELECT 1 FROM message_embeddings WHERE text_hash = ?').get(hash);
              if (!exists) {
                const { packEmbedding } = require('./utils');
                db.prepare(`
                  INSERT INTO message_embeddings (embedding, msg_id, text_hash, created_at)
                  VALUES (?, ?, ?, datetime('now'))
                `).run(packEmbedding(embedding), msgId, hash);
              }
            } catch (err) {
              console.error(`自动向量化消息 ${msgId} 失败:`, err);
            }
          });
        }
        return msgId;
      }
    
      async search(query: string, limit = 20): Promise<SearchResult[]> {
        // 1. FTS5 关键词搜索
        const keywordResults = this.sessionStore.searchByKeyword(query, 30);
    
        // 2. 向量语义搜索
        let vectorResults: SearchResult[] = [];
        try {
          let queryVec = this.queryCache.get(query);
          if (!queryVec) {
            const { embeddingService } = require('./embedding');
            queryVec = await embeddingService.getEmbedding(query);
            this.queryCache.set(query, queryVec);
          }
          vectorResults = searchByVector(queryVec, 30);
        } catch (e) {
          console.warn('向量搜索失败,降级为纯关键词搜索:', e);
        }
    
        // 3. RRF 融合
        const fused = rrfFusion(keywordResults, vectorResults);
        return fused.slice(0, limit);
      }
    }
    

    五、使用示例

    
    import { MemoryManager } from './memory/memoryManager';
    
    const memory = new MemoryManager();
    
    // 创建会话并追加消息
    const sessionId = memory.sessionStore.createSession('系统调优');
    memory.appendMessage(sessionId, 'user', '如何清理磁盘空间?');
    memory.appendMessage(sessionId, 'assistant', '可以尝试使用 du -sh 查看大文件...');
    
    // 语义搜索
    const results = await memory.search('硬盘不够了');
    console.log(results);
    

    六、优化清单(对照原方案)

    | 优化项 | 涉及模块 |
    |--------------------------------------|-------------------|
    | 1. 向量搜索改用 sqlite-vec 原生 KNN | vectorSearch.ts |
    | 2. 增加 Embedding 请求队列与限流 | embedding.ts |
    | 3. FTS5 查询特殊字符转义 | utils.ts |
    | 4. 批量 Embedding + 重试 + 超时 | embedding.ts |
    | 5. 消息删除清理向量、会话时间更新 | database.ts |
    | 6. 回填失败记录与跳过 | backfill.ts |
    | 7. 环境变量管理 | database.ts |
    | 8. 统一服务入口 MemoryManager | memoryManager.ts |
    本方案可直接交付 Hermes Agent,所有代码模块清晰、无外部黑盒依赖,可直接部署在 CmdCode 项目中。