Skip to content

RAG Pipeline 实战

RAG(Retrieval-Augmented Generation)的核心流程:

  1. 文档加载:读取原始文档
  2. 分块(Chunking):将长文档切分为小段
  3. 嵌入(Embedding):将文本转为向量
  4. 存储:将向量存入向量数据库
  5. 检索(Retrieval):根据用户查询找到最相关的文档块
  6. 生成(Generation):将检索结果作为上下文,让 LLM 生成回答

本文用 OpenAI Embedding + ChromaDB 实现一个完整可运行的 RAG Pipeline。

Terminal window
pip install openai chromadb tiktoken
import tiktoken
def load_documents(file_paths: list[str]) -> list[dict]:
"""加载文本文件"""
documents = []
for path in file_paths:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
documents.append({"path": path, "content": content})
return documents
def chunk_text(
text: str,
chunk_size: int = 500,
chunk_overlap: int = 50,
source: str = "",
) -> list[dict]:
"""
按 Token 数分块
为什么按 Token 而不是字符分块?因为 LLM 的上下文窗口和
Embedding 模型的输入限制都以 Token 计。按字符分块可能
导致一个块的 Token 数超出模型限制。
Args:
text: 原始文本
chunk_size: 每块最大 Token 数
chunk_overlap: 相邻块的重叠 Token 数(防止语义被切断)
source: 来源文件名
"""
encoder = tiktoken.encoding_for_model("gpt-4o-mini")
tokens = encoder.encode(text)
chunks = []
start = 0
chunk_id = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = encoder.decode(chunk_tokens)
chunks.append({
"id": f"{source}_chunk_{chunk_id}",
"text": chunk_text,
"source": source,
"token_count": len(chunk_tokens),
})
start += chunk_size - chunk_overlap
chunk_id += 1
return chunks
from openai import OpenAI
import chromadb
client = OpenAI()
def get_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""批量获取文本嵌入向量"""
response = client.embeddings.create(input=texts, model=model)
return [item.embedding for item in response.data]
def build_vector_store(
chunks: list[dict],
collection_name: str = "rag_collection",
) -> chromadb.Collection:
"""构建向量存储"""
chroma_client = chromadb.PersistentClient(path="./chroma_db")
# 删除已存在的同名集合(如果有)
try:
chroma_client.delete_collection(collection_name)
except Exception:
pass
collection = chroma_client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
# 批量处理(避免一次性发送过多数据)
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c["text"] for c in batch]
ids = [c["id"] for c in batch]
metadatas = [{"source": c["source"]} for c in batch]
embeddings = get_embeddings(texts)
collection.add(
ids=ids,
documents=texts,
embeddings=embeddings,
metadatas=metadatas,
)
print(f"已索引 {min(i + batch_size, len(chunks))}/{len(chunks)} 个文档块")
return collection
def retrieve(
collection: chromadb.Collection,
query: str,
top_k: int = 5,
) -> list[dict]:
"""检索最相关的文档块"""
query_embedding = get_embeddings([query])[0]
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
retrieved = []
for i in range(len(results["ids"][0])):
retrieved.append({
"id": results["ids"][0][i],
"text": results["documents"][0][i],
"distance": results["distances"][0][i],
"source": results["metadatas"][0][i]["source"],
})
return retrieved
def generate_answer(query: str, context_chunks: list[dict]) -> str:
"""基于检索结果生成回答"""
context = "\n\n---\n\n".join([
f"[来源: {c['source']}]\n{c['text']}"
for c in context_chunks
])
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是一个知识助手。根据以下参考资料回答用户问题。\n"
"如果参考资料中没有相关信息,请如实说明。\n"
"回答时引用来源。"
),
},
{
"role": "user",
"content": f"参考资料:\n{context}\n\n问题: {query}",
},
],
temperature=0,
)
return response.choices[0].message.content
def rag_pipeline(file_paths: list[str], query: str) -> str:
"""完整 RAG 管线"""
# 1. 加载文档
print("1. 加载文档...")
docs = load_documents(file_paths)
# 2. 分块
print("2. 文档分块...")
all_chunks = []
for doc in docs:
chunks = chunk_text(doc["content"], source=doc["path"])
all_chunks.extend(chunks)
print(f" 共 {len(all_chunks)} 个块")
# 3-4. 嵌入 + 存储
print("3. 构建向量索引...")
collection = build_vector_store(all_chunks)
# 5. 检索
print("4. 检索相关文档...")
results = retrieve(collection, query)
print(f" 找到 {len(results)} 个相关块")
# 6. 生成
print("5. 生成回答...")
answer = generate_answer(query, results)
return answer
if __name__ == "__main__":
answer = rag_pipeline(
file_paths=["./docs/guide.txt", "./docs/faq.txt"],
query="如何配置系统?",
)
print(f"\n回答:\n{answer}")
决策点本文选择替代方案
分块策略按 Token 数固定分块按段落/语义分块
Embedding 模型text-embedding-3-smallCohere、BGE、Jina
向量数据库ChromaDB(本地)Pinecone、Weaviate、Qdrant
相似度度量CosineL2、Inner Product
自测题 1:chunk_overlap 的作用是什么?如果设为 0 会有什么问题?
chunk_overlap 让相邻块有一部分重叠内容,确保跨块边界的信息不会丢失。设为 0 时,恰好在块边界处的完整语句会被切断,检索时可能找不到完整的相关信息。
自测题 2:为什么要批量处理 Embedding 而不是逐条处理?
API 调用有网络延迟开销。批量处理(一次发送 100 条)可以大幅减少 API 调用次数,降低延迟和成本。
自测题 3:生成阶段的 System Prompt 中为什么强调"如果没有相关信息请如实说明"?
防止 LLM 在参考资料中找不到答案时编造信息(幻觉)。明确指示 LLM 在信息不足时承认而不是臆测。