Files
grace-scripts/rag-ingest.py
Grace 014ec8bd5c feat: initial import of all helper scripts from ~/scripts/
- Training data pipeline: convert, export, extract, load-to-db
- Infra tooling: infra-audit, infra-gitea-link
- RAG pipeline: rag-ingest, rag-query
- Fine-tuning: finetune-lora, overnight-qwen3, install-unsloth
- Transcripts: export-transcripts
- Updated README with script index and token reduction strategy
2026-03-16 22:32:48 -07:00

344 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RAG ingestion pipeline for Grace's homelab knowledge base.
Fetches documentation pages, chunks them, embeds via nomic-embed-text
(memory-engine Ollama), and upserts to Qdrant on DB VM.
Usage:
python3 ~/scripts/rag-ingest.py # process all pending/stale docs
python3 ~/scripts/rag-ingest.py --hours 2 # stop after N hours (default 2)
python3 ~/scripts/rag-ingest.py --url URL # ingest a single URL immediately
python3 ~/scripts/rag-ingest.py --force # re-ingest all regardless of freshness
GRACE DIRECTIVE: When Maxwell sends a URL and says 'read this', 'remember this',
'learn this', 'add this to your docs', or 'add this to the RAG':
1. Add it to ~/scripts/rag-docs.json under the appropriate category
2. Run: python3 ~/scripts/rag-ingest.py --url <URL>
3. Confirm it was ingested
"""
import argparse
import hashlib
import json
import os
import re
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from typing import Optional
# ── Config ───────────────────────────────────────────────────────────────────
DOCS_FILE = os.path.expanduser("~/scripts/rag-docs.json")
STATE_FILE = os.path.expanduser("~/self-improving/rag-state.json")
QDRANT_URL = "http://localhost:6333"
OLLAMA_URL = "http://192.168.20.142:11434" # memory-engine
EMBED_MODEL = "nomic-embed-text"
COLLECTION = "homelab_docs"
EMBED_DIM = 768 # nomic-embed-text output size
CHUNK_SIZE = 600 # tokens approx (chars/4)
CHUNK_OVERLAP = 60 # ~10% overlap
MAX_CHUNK_CHARS = CHUNK_SIZE * 4
OVERLAP_CHARS = CHUNK_OVERLAP * 4
DEFAULT_HOURS = 2
# ── HTTP helpers ─────────────────────────────────────────────────────────────
def http_get(url: str, timeout: int = 15) -> Optional[str]:
try:
req = urllib.request.Request(url, headers={"User-Agent": "Grace-RAG/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.read().decode("utf-8", errors="replace")
except Exception as e:
print(f" Fetch error {url}: {e}")
return None
def http_post_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(
url, data=body,
headers={"Content-Type": "application/json", "User-Agent": "Grace-RAG/1.0"},
method="POST"
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())
except Exception as e:
print(f" POST error {url}: {e}")
return None
def http_put_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(
url, data=body,
headers={"Content-Type": "application/json"},
method="PUT"
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())
except Exception as e:
print(f" PUT error {url}: {e}")
return None
# ── HTML → text ──────────────────────────────────────────────────────────────
def html_to_text(html: str) -> str:
"""Strip HTML tags and clean whitespace."""
# Remove scripts and styles
html = re.sub(r'<(script|style)[^>]*>.*?</\1>', '', html, flags=re.DOTALL | re.IGNORECASE)
# Remove HTML comments
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
# Preserve heading structure
html = re.sub(r'<h([1-6])[^>]*>(.*?)</h\1>', lambda m: f"\n{'#'*int(m.group(1))} {m.group(2)}\n", html, flags=re.DOTALL)
# Preserve code blocks
html = re.sub(r'<(pre|code)[^>]*>(.*?)</\1>', lambda m: f"\n```\n{m.group(2)}\n```\n", html, flags=re.DOTALL)
# Convert links to text
html = re.sub(r'<a[^>]*>(.*?)</a>', r'\1', html, flags=re.DOTALL)
# Strip remaining tags
html = re.sub(r'<[^>]+>', ' ', html)
# Decode entities
html = html.replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>') \
.replace('&nbsp;', ' ').replace('&#39;', "'").replace('&quot;', '"')
# Clean whitespace
html = re.sub(r'\n{3,}', '\n\n', html)
html = re.sub(r'[ \t]{2,}', ' ', html)
return html.strip()
# ── Chunking ─────────────────────────────────────────────────────────────────
def chunk_text(text: str, source_name: str, url: str) -> list[dict]:
"""
Split text into overlapping chunks, preserving section headings as context.
Returns list of chunk dicts with metadata.
"""
chunks = []
# Split on heading boundaries first
sections = re.split(r'(?=\n#{1,4} )', text)
current_heading = ""
for section in sections:
heading_match = re.match(r'\n(#{1,4} .+)\n', section)
if heading_match:
current_heading = heading_match.group(1).strip('# ').strip()
# Further split long sections into overlapping chunks
start = 0
while start < len(section):
end = start + MAX_CHUNK_CHARS
chunk_text_val = section[start:end].strip()
if len(chunk_text_val) < 50:
start = end - OVERLAP_CHARS
continue
chunk_id = hashlib.md5(f"{url}:{start}".encode()).hexdigest()
chunks.append({
"id": chunk_id,
"text": chunk_text_val,
"source": source_name,
"url": url,
"section": current_heading,
"char_offset": start,
})
start = end - OVERLAP_CHARS
return chunks
# ── Embedding ────────────────────────────────────────────────────────────────
def embed(texts: list[str]) -> Optional[list[list[float]]]:
"""Embed a batch of texts via nomic-embed-text on memory-engine."""
embeddings = []
for text in texts:
result = http_post_json(
f"{OLLAMA_URL}/api/embeddings",
{"model": EMBED_MODEL, "prompt": text}
)
if result and "embedding" in result:
embeddings.append(result["embedding"])
else:
print(f" Embedding failed for text: {text[:50]}")
embeddings.append(None)
return embeddings
# ── Qdrant ───────────────────────────────────────────────────────────────────
def ensure_collection():
"""Create homelab_docs collection if it doesn't exist."""
existing = http_get(f"{QDRANT_URL}/collections/{COLLECTION}")
if existing:
try:
data = json.loads(existing)
if data.get("status") == "ok":
return # already exists
except Exception:
pass
print(f"Creating Qdrant collection '{COLLECTION}'...")
http_put_json(f"{QDRANT_URL}/collections/{COLLECTION}", {
"vectors": {
"size": EMBED_DIM,
"distance": "Cosine"
}
})
def upsert_chunks(chunks: list[dict], embeddings: list[list[float]]):
"""Upsert chunks + embeddings to Qdrant."""
points = []
for chunk, embedding in zip(chunks, embeddings):
if embedding is None:
continue
# Convert hex id to int for Qdrant
point_id = int(chunk["id"][:8], 16)
points.append({
"id": point_id,
"vector": embedding,
"payload": {
"text": chunk["text"],
"source": chunk["source"],
"url": chunk["url"],
"section": chunk["section"],
}
})
if not points:
return 0
result = http_put_json(
f"{QDRANT_URL}/collections/{COLLECTION}/points?wait=true",
{"points": points}
)
return len(points) if result and result.get("status") == "ok" else 0
# ── State ────────────────────────────────────────────────────────────────────
def load_state() -> dict:
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f)
return {"ingested": {}, "total_chunks": 0}
def save_state(state: dict):
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def is_stale(state: dict, url: str, refresh_days: int) -> bool:
entry = state["ingested"].get(url)
if not entry:
return True
last = datetime.fromisoformat(entry["ingested_at"])
return datetime.now(timezone.utc) - last > timedelta(days=refresh_days)
# ── Main ─────────────────────────────────────────────────────────────────────
def ingest_url(url: str, source_name: str, state: dict) -> int:
"""Fetch, chunk, embed, upsert one URL. Returns chunk count."""
print(f" Fetching: {url}")
html = http_get(url)
if not html:
return 0
text = html_to_text(html)
if len(text) < 200:
print(f" Too short after cleaning ({len(text)} chars), skipping")
return 0
chunks = chunk_text(text, source_name, url)
print(f" {len(chunks)} chunks")
# Embed in batches of 10
all_embeddings = []
for i in range(0, len(chunks), 10):
batch = chunks[i:i+10]
embeddings = embed([c["text"] for c in batch])
all_embeddings.extend(embeddings)
time.sleep(0.5) # be gentle with memory-engine
count = upsert_chunks(chunks, all_embeddings)
state["ingested"][url] = {
"source": source_name,
"ingested_at": datetime.now(timezone.utc).isoformat(),
"chunks": count,
}
state["total_chunks"] = state.get("total_chunks", 0) + count
return count
def main():
parser = argparse.ArgumentParser(description="Grace RAG ingestion pipeline")
parser.add_argument("--hours", type=float, default=DEFAULT_HOURS, help="Max hours to run")
parser.add_argument("--url", type=str, help="Ingest a single URL immediately")
parser.add_argument("--force", action="store_true", help="Re-ingest all regardless of freshness")
parser.add_argument("--status", action="store_true", help="Show ingestion status and exit")
args = parser.parse_args()
state = load_state()
if args.status:
print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}")
print(f"URLs ingested: {len(state['ingested'])}")
for url, info in sorted(state["ingested"].items(), key=lambda x: x[1]["ingested_at"]):
print(f" {info['ingested_at'][:10]} {info['chunks']:4d} chunks {url}")
return
ensure_collection()
# Single URL mode
if args.url:
print(f"Ingesting: {args.url}")
count = ingest_url(args.url, args.url, state)
save_state(state)
print(f"Done: {count} chunks ingested")
return
# Load doc list
with open(DOCS_FILE) as f:
docs = json.load(f)
deadline = datetime.now(timezone.utc) + timedelta(hours=args.hours)
total_new_chunks = 0
total_urls = 0
for source in docs["sources"]:
name = source["name"]
refresh_days = source.get("refresh_days", 30)
for url in source["urls"]:
if datetime.now(timezone.utc) >= deadline:
print(f"\nTime limit ({args.hours}h) reached — stopping. Run again to continue.")
break
if not args.force and not is_stale(state, url, refresh_days):
continue
print(f"\n[{name}]")
count = ingest_url(url, name, state)
total_new_chunks += count
total_urls += 1
save_state(state) # save after each URL so progress isn't lost
time.sleep(1) # polite crawling
print(f"\n=== Done: {total_urls} URLs, {total_new_chunks} new chunks ===")
print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}")
if __name__ == "__main__":
main()