🤖 概述

将 Hermes Agent 的历史会话文件(46,000+ 文件,3.17GB)批量导入 L0 记忆数据库并生成向量嵌入的全流程。涵盖 Session 碎片化、L1 管道瓶颈、向量嵌入 BUG 等踩坑经验。


适用场景

  • 首次部署记忆系统后需要批量导入历史会话
  • 迁移记忆系统后需要重新导入
  • 补充遗漏的会话数据(断点续传)
  • 碎片会话合并为可记忆的连续对话

前置条件

  • TencentDB-Agent-Memory gateway 运行中(端口 8420)
  • vectors.db 已初始化(有 l0_conversations、l0_vec_rowids、l0_fts 等表)
  • Redis / 记忆系统已启用 L0 捕获
  • Node.js + node-llama-cpp(向量嵌入用)

核心原则

不走 HTTP API,直接 SQL 写入,实现 38x 加速。

  • HTTP /capture 端点单条发送,有 30s PHP-FPM 超时限制,每秒限流
  • 直接 INSERT INTO l0_conversations + l0_vec_rowids + l0_fts 批量提交
  • 保留原始 timestamp 而非捕获时间
  • 统一 session_key 避免碎片化(给同一批导入的历史数据分配相同 session_key)

架构概览

原始会话文件 (46,000+) │ ├─ session_*.json → 含 system_prompt + messages ├─ *.jsonl → JSON Lines,每行一轮对话 └─ request_dump_*.json → ❌ 不含对话,跳过 │ ▼ 提取 user/assistant 轮次 │ ▼ 去重(比对前60字符) │ ▼ SQL 批量写入 ──→ l0_conversations 表 │ │ │ ├─ l0_vec_rowids (向量嵌入) │ ├─ l0_fts (全文搜索) │ └─ pipeline_states (调度器状态) │ ▼ POST /capture 唤醒调度器 ──→ L1 提取 │ └─ l1_records 表

四阶段工作流

Phase 1: 源文件分析

三种格式识别

格式示例说明处理
session_*.jsonsession_20260423_174126_e808b10d.json含 system_prompt + messagesjson.load() 提取
*.jsonl20260427_081652_0db72b.jsonlJSON Lines,每行一个对话轮次逐行 json.loads()
request_dump_*.jsonrequest_dump_20260424_*.jsonAPI 请求转储,不含对话跳过
❌ 常见错误:.jsonl 文件用 json.load() 读取导致 JSONDecodeError。
✅ 正确做法:逐行读取,每行独立 json.loads()。

session_id = session_key 统一

将同一批导入的大量碎片会话赋予相同的 session_key(如 "history-qqbot-full"),便于 L1 管道按 session_key 分组提取记忆,避免每个碎片文件独立成 session 导致提炼不出有效记忆。

原始文件名保留为 session_id 字段用于追溯。

去重策略

通过比对消息文本前 60 字符判断是否已存在:

# 从数据库中加载已有消息的签名
existing = set()
for row in conn.execute(
    "SELECT substr(message_text,1,60) FROM l0_conversations WHERE session_key=? AND role='user'",
    (SESSION_KEY,)
):
    existing.add(row[0])

# 跳过已在库中的轮次
new_rounds = [r for r in all_rounds if r['u'][:60] not in existing]

Phase 2: SQL 批量写入 L0

DatabaseSync + BEGIN/COMMIT

使用 DatabaseSync 类(gateway 内置)以事务方式批量写入,避免逐条 insert 的性能开销:

conn = DatabaseSync('/root/.memory-tencentdb/memory-tdai/vectors.db')
conn.run('BEGIN')
try:
    for round in new_rounds:
        conn.run(
            "INSERT INTO l0_conversations (record_id, session_key, session_id, role, message_text, recorded_at, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?)",
            [record_id, session_key, session_id, role, text, timestamp, timestamp]
        )
    conn.run('COMMIT')
except:
    conn.run('ROLLBACK')
    raise

l0_conversations + l0_fts 同步写入

全文搜索表 l0_fts 使用 SQLite FTS5 虚拟表,需在插入主表后同步写入:

conn.run(
    "INSERT INTO l0_fts (rowid, message_text) VALUES (?, ?)",
    [last_rowid, message_text]
)

原始 timestamp 保留

❌ 常见错误:通过 /capture 端点导入时,DB 中 timestamp 全是导入时间而非对话发生时间。
✅ 正确做法:直接 SQL 写入,从源文件提取原始 timestamp 传入 recorded_at 和 timestamp 字段。
✅ 性能参考:SQL 批量写入 44,271 条消息耗时约 2 分钟,而 HTTP /capture 方式需要 7+ 小时(38x 加速)。

Phase 3: 批量向量嵌入

node-llama-cpp 加载 GGUF

使用本地 GGUF 模型(如 bge-m3)通过 node-llama-cpp 生成嵌入向量,不走外部 API:

import { getLlama, LlamaEmbedding } from 'node-llama-cpp'

const llama = await getLlama()
const model = await llama.loadModel({ modelPath: '/path/to/bge-m3.Q4_K_M.gguf' })
const context = await model.createEmbeddingContext()

async function getEmbeddingFor(text) {
    const embedding = await context.getEmbeddingFor(text)
    return embedding.vector  // Float32Array
}

数组传参 BUG 警示

❌ ⚠️ 关键 BUGcontext.getEmbeddingFor() 的数组重载版本存在隐含截断 BUG——传入数组 [text1, text2, ...] 时,只有最后一轮的 embedding 被返回,中间轮次的输出在 C++ 层被覆盖。

✅ 正确做法:逐条调用 getEmbeddingFor(text),每条传入单个字符串,不传数组。

超长文本修复

超过模型上下文窗口的文本需要截断或分段处理:

MAX_TOKENS = 512  # bge-m3 上下文限制
def truncate_to_tokens(text, max_tokens=MAX_TOKENS):
    # 按 token 数截断,保留前 max_tokens
    tokens = tokenizer.encode(text)
    if len(tokens) > max_tokens:
        return tokenizer.decode(tokens[:max_tokens])
    return text

向量写入

INSERT INTO l0_vec_rowids (rowid, vector_blob) VALUES (?, ?)
-- vector_blob 为 Float32Array 的 Buffer

Phase 4: 管道注入与验证

pipeline_states 注入

直接写入 SQL 导入的数据不会被调度器自动识别。需在 pipeline_states 表中插入状态记录:

INSERT OR REPLACE INTO pipeline_states
(session_key, state, updated_at)
VALUES (?, 'pending', ?)

POST /capture 唤醒调度器(关键!)

即使注入了 pipeline_states,调度器也需要被手动唤醒。发送一个空 POST 到 /capture 端点:

curl -X POST http://127.0.0.1:8420/capture \
  -H "Content-Type: application/json" \
  -d '{
    "user_content": ".",
    "assistant_content": ".",
    "session_key": "wakeup-trigger",
    "session_id": "wakeup"
  }'

这会触发调度器的 isTimeToExtract() 判断,进而执行 L1 提取队列。

重启 Gateway

导入完成后重启 gateway 以确保所有内部状态刷新:

systemctl restart memory-tdai-gateway

向量覆盖率检查

SELECT COUNT(*) as total_msg,
       (SELECT COUNT(*) FROM l0_vec_rowids) as total_vec,
       (SELECT COUNT(*) FROM l0_fts) as total_fts
FROM l0_conversations;

正常状态:total_msg ≈ total_vec ≈ total_fts。若 total_vec 明显少于 total_msg,说明部分消息未生成嵌入,需排查 Phase 3 的截断或数组 BUG。


Warmup 阈值陷阱(卡住修复)

症状:批量导入后 L1 提取数为 0,调度器未触发任何提取。

根因shouldExtractL1() 检查函数有 warmup 阈值条件——新 session 需要达到一定消息数量(默认 10 条)才触发提取。如果每次 POST /capture 只发送 1-2 轮对话,调度器认为"消息不够多"而跳过。

修复方法
1. 将同一批导入的所有消息使用同一 session_key,确保累计达到阈值
2. 或临时修改 maxMessagesPerExtraction 降低阈值(config.yaml)
3. 或直接 SQL 写入 pipeline_states 表绕过阈值检查

验证:查看 gateway 日志 journalctl -u memory-tdai-gateway --since "5 min ago" | grep -i 'extract\|warmup\|threshold'

性能参考

方法44,271 条消息备注
HTTP /capture(逐条)~7 小时30s PHP-FPM 超时 + 限流
SQL 批量写入~2 分钟事务提交 + 原始 timestamp 保留
向量嵌入(逐条)~11 分钟node-llama-cpp GGUF 本地推理
总耗时~13 分钟vs 7h HTTP 方式 → 38x 加速

故障排除速查

症状根因排查
L0 消息数远少于源文件数质量门过滤了 cron 噪音会话检查 shouldExtractL1() 的过滤条件
向量数少于消息数尾巴消息的 embedding 被跳过检查 auto-capture.ts 的 maxNewMessages 上限
时间戳全是同一天用了 /capture 的自动时间戳直接 SQL 写入时传原始时间
L1 只有几十条maxMessagesPerExtraction=10 + MiniMax JSON 截断用 DeepSeek V4 批量重提 / 放宽阈值
getEmbeddingFor 数组只返回最后一条node-llama-cpp 数组重载 BUG逐条调用,不传数组
调度器触发后 L1 为 0warmup 阈值未达到统一 session_key 或直接写 pipeline_states
/capture 返回 502PHP-FPM 30s 超时 kill 了进程用后台 worker 模式分批发送
API 密钥 403config.yaml 中密钥为占位符echo $TDAI_LLM_API_KEY 取运行时注入值
JSON 被截断无输出MiniMax M2.7 的 <think> 标签消耗 token设 max_tokens=4096,优化 system prompt
去重过严过滤了太多bigram 相似度阈值 90% 太高降阈值或跳过重复检查
DB 文件过大未清理过期数据DELETE + VACUUM 或分表策略
Gateway 内存持续增长向量嵌入上下文未释放重启 gateway 或启用批次 GC

全链路审计清单

#检查项验证方法
1源文件分析完成统计文件数 = session_*.json + *.jsonl,request_dump 已排除
2去重计数正确new_rounds + existing == all_rounds
3SQL 写入事务完整COMMIT 后 SELECT COUNT(*) 验证行数
4timestamp 为原始值SELECT MIN(timestamp), MAX(timestamp) 验证时间范围
5角色平衡SELECT role, COUNT(*) GROUP BY role → user ≈ assistant
6FTS 完整l0_fts 表行数 = l0_conversations 表行数
7向量嵌入完整l0_vec_rowids 表行数 ≈ l0_conversations 表行数
8pipeline_states 已注入SELECT * FROM pipeline_states WHERE state='pending'
9/capture 唤醒成功curl POST 返回 200,gateway 日志出现 extract
10Gateway 重启后运行正常systemctl is-active memory-tdai-gateway
11向量覆盖率 ≥ 95%total_vec / total_msg * 100
12无孤儿消息session_id 分组,每组 ≥ 2 条
13L1 提取已触发l1_records 表有数据
14无重复导入相同前 60 字符的消息 count = 1

关键经验教训

#教训说明
1Session 碎片化 → 记忆提炼不足每批导入文件分配独立 session_id 导致 L1 管道每批只看 2-10 条消息,提炼不出有效记忆。统一 session_key 解决。
2HTTP API 方式极慢(7h vs 2min)/capture 端点 30s 超时 + 每秒限流,44K 消息需要 7 小时。直接 SQL 写入仅需 2 分钟。
3原始时间戳丢失/capture 用当前时间覆盖 recorded_at。直接 SQL 写入传入源文件中的原始 timestamp。
4getEmbeddingFor 数组传参 BUGnode-llama-cpp 的数组重载版本只返回最后一轮 embedding。必须逐条调用。
5Warmup 阈值未达到 → L1 为 0shouldExtractL1() 需要 session 累计 10+ 条消息才触发。直接写 pipeline_states 绕过。
6API 密钥不可见(403)config.yaml 中 key 显示为 sk-xxx-xxx 占位符,实际值在环境变量中。用 echo $TDAI_LLM_API_KEY 获取。
7MiniMax JSON 截断M2.7 模型的 <think> 标签消耗大量 token 导致 JSON 被截断。设 max_tokens=4096。
8Cron 噪音污染Hermes 会话中 83% 是 cron/自动化会话,需要过滤。用 shouldExtractL1() 的质量门条件。
9去重过程过严bigram 相似度 90% 阈值过滤了非重复但有语义差异的消息。适当降低或跳过。
10向量上下文未释放 → 内存泄漏LLM 嵌入上下文在批量处理后未释放,gateway 内存持续增长。定时重启或批次 GC。

已知限制

  1. 实时 vs 批量:pipeline 设计为实时增量提取,历史批量导入效率低。批量场景应走 SQL 直写 + 手动唤醒。
  2. MiniMax M2.7 JSON 问题:<think> 标签消耗 token 导致 JSON 被截断,需设 max_tokens=4096。
  3. API 密钥可移植性:密钥在 config.yaml 中存储为占位符,实际值在环境变量中。
  4. 时间戳恢复:直接 SQL 写入可保留原始 timestamp,/capture 方式会丢失。
  5. 向量模型大小:GGUF 模型约 500MB-2GB,需确保磁盘空间充足。