📄 utils.ts • 2472 bytes
/**
* CmdCode 向量记忆系统 - 工具函数
*/
import crypto from 'crypto'
import { readFileSync } from 'fs'
/** 将浮点数数组打包为二进制 Buffer (little-endian) */
export function packEmbedding(vec: number[]): Buffer {
const buf = Buffer.allocUnsafe(vec.length * 4)
for (let i = 0; i < vec.length; i++) {
buf.writeFloatLE(vec[i], i * 4)
}
return buf
}
/** 从二进制 Buffer 解包为浮点数数组 */
export function unpackEmbedding(blob: Buffer): number[] {
const vec: number[] = []
for (let i = 0; i < blob.length; i += 4) {
vec.push(blob.readFloatLE(i))
}
return vec
}
/** SHA256 哈希 */
export function sha256(text: string): string {
return crypto.createHash('sha256').update(text).digest('hex')
}
/** 清理 FTS5 查询输入,防止注入 */
export function sanitizeFTS5Query(input: string): string {
// 移除非字母数字字符(保留中文、英文、常用符号)
let cleaned = input.replace(/[^\w\u4e00-\u9fff\s@.-]/g, ' ').trim()
if (!cleaned) return '*'
// P2 #2.3: 限制最多10个词,每词最长200字符(防DoS)
const words = cleaned.split(/\s+/).filter(w => w.length > 0).map(w => w.slice(0, 200)).slice(0, 10)
if (words.length === 0) return '*'
// 中文查询:拆分为 trigram 子串进行子串匹配
// 英文查询:保持引号精确匹配
const terms: string[] = []
for (const w of words) {
if (/[\u4e00-\u9fff]/.test(w)) {
// 提取连续中文段,每3字一个 trigram 项
const chunks = w.match(/[\u4e00-\u9fff]{3,}/g) || []
for (const chunk of chunks) {
if (chunk.length === 3) {
terms.push(chunk)
} else {
// 滑窗提取 3-gram
for (let i = 0; i <= chunk.length - 3; i++) {
terms.push(chunk.substring(i, i + 3))
}
}
}
} else {
terms.push(`"${w.replace(/"/g, '""')}"`)
}
}
if (terms.length === 0) return '*'
// 去重后用 OR 连接
return [...new Set(terms)].join(' OR ')
}
/** 限制字符串长度 */
export function truncate(text: string, maxLen: number): string {
if (text.length <= maxLen) return text
return text.substring(0, maxLen - 3) + '...'
}
/** 生成短 ID */
export function generateId(prefix: string = ''): string {
const timestamp = Date.now().toString(36)
const random = Math.random().toString(36).substring(2, 8)
return prefix ? `${prefix}_${timestamp}${random}` : `${timestamp}${random}`
}