feat: initial import of all helper scripts from ~/scripts/
- Training data pipeline: convert, export, extract, load-to-db - Infra tooling: infra-audit, infra-gitea-link - RAG pipeline: rag-ingest, rag-query - Fine-tuning: finetune-lora, overnight-qwen3, install-unsloth - Transcripts: export-transcripts - Updated README with script index and token reduction strategy
This commit is contained in:
343
rag-ingest.py
Executable file
343
rag-ingest.py
Executable file
@@ -0,0 +1,343 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
RAG ingestion pipeline for Grace's homelab knowledge base.
|
||||
|
||||
Fetches documentation pages, chunks them, embeds via nomic-embed-text
|
||||
(memory-engine Ollama), and upserts to Qdrant on DB VM.
|
||||
|
||||
Usage:
|
||||
python3 ~/scripts/rag-ingest.py # process all pending/stale docs
|
||||
python3 ~/scripts/rag-ingest.py --hours 2 # stop after N hours (default 2)
|
||||
python3 ~/scripts/rag-ingest.py --url URL # ingest a single URL immediately
|
||||
python3 ~/scripts/rag-ingest.py --force # re-ingest all regardless of freshness
|
||||
|
||||
GRACE DIRECTIVE: When Maxwell sends a URL and says 'read this', 'remember this',
|
||||
'learn this', 'add this to your docs', or 'add this to the RAG':
|
||||
1. Add it to ~/scripts/rag-docs.json under the appropriate category
|
||||
2. Run: python3 ~/scripts/rag-ingest.py --url <URL>
|
||||
3. Confirm it was ingested
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Optional
|
||||
|
||||
# ── Config ───────────────────────────────────────────────────────────────────
|
||||
DOCS_FILE = os.path.expanduser("~/scripts/rag-docs.json")
|
||||
STATE_FILE = os.path.expanduser("~/self-improving/rag-state.json")
|
||||
QDRANT_URL = "http://localhost:6333"
|
||||
OLLAMA_URL = "http://192.168.20.142:11434" # memory-engine
|
||||
EMBED_MODEL = "nomic-embed-text"
|
||||
COLLECTION = "homelab_docs"
|
||||
EMBED_DIM = 768 # nomic-embed-text output size
|
||||
CHUNK_SIZE = 600 # tokens approx (chars/4)
|
||||
CHUNK_OVERLAP = 60 # ~10% overlap
|
||||
MAX_CHUNK_CHARS = CHUNK_SIZE * 4
|
||||
OVERLAP_CHARS = CHUNK_OVERLAP * 4
|
||||
DEFAULT_HOURS = 2
|
||||
|
||||
|
||||
# ── HTTP helpers ─────────────────────────────────────────────────────────────
|
||||
|
||||
def http_get(url: str, timeout: int = 15) -> Optional[str]:
|
||||
try:
|
||||
req = urllib.request.Request(url, headers={"User-Agent": "Grace-RAG/1.0"})
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return r.read().decode("utf-8", errors="replace")
|
||||
except Exception as e:
|
||||
print(f" Fetch error {url}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def http_post_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]:
|
||||
try:
|
||||
body = json.dumps(data).encode()
|
||||
req = urllib.request.Request(
|
||||
url, data=body,
|
||||
headers={"Content-Type": "application/json", "User-Agent": "Grace-RAG/1.0"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return json.loads(r.read().decode())
|
||||
except Exception as e:
|
||||
print(f" POST error {url}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def http_put_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]:
|
||||
try:
|
||||
body = json.dumps(data).encode()
|
||||
req = urllib.request.Request(
|
||||
url, data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="PUT"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as r:
|
||||
return json.loads(r.read().decode())
|
||||
except Exception as e:
|
||||
print(f" PUT error {url}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ── HTML → text ──────────────────────────────────────────────────────────────
|
||||
|
||||
def html_to_text(html: str) -> str:
|
||||
"""Strip HTML tags and clean whitespace."""
|
||||
# Remove scripts and styles
|
||||
html = re.sub(r'<(script|style)[^>]*>.*?</\1>', '', html, flags=re.DOTALL | re.IGNORECASE)
|
||||
# Remove HTML comments
|
||||
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
|
||||
# Preserve heading structure
|
||||
html = re.sub(r'<h([1-6])[^>]*>(.*?)</h\1>', lambda m: f"\n{'#'*int(m.group(1))} {m.group(2)}\n", html, flags=re.DOTALL)
|
||||
# Preserve code blocks
|
||||
html = re.sub(r'<(pre|code)[^>]*>(.*?)</\1>', lambda m: f"\n```\n{m.group(2)}\n```\n", html, flags=re.DOTALL)
|
||||
# Convert links to text
|
||||
html = re.sub(r'<a[^>]*>(.*?)</a>', r'\1', html, flags=re.DOTALL)
|
||||
# Strip remaining tags
|
||||
html = re.sub(r'<[^>]+>', ' ', html)
|
||||
# Decode entities
|
||||
html = html.replace('&', '&').replace('<', '<').replace('>', '>') \
|
||||
.replace(' ', ' ').replace(''', "'").replace('"', '"')
|
||||
# Clean whitespace
|
||||
html = re.sub(r'\n{3,}', '\n\n', html)
|
||||
html = re.sub(r'[ \t]{2,}', ' ', html)
|
||||
return html.strip()
|
||||
|
||||
|
||||
# ── Chunking ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def chunk_text(text: str, source_name: str, url: str) -> list[dict]:
|
||||
"""
|
||||
Split text into overlapping chunks, preserving section headings as context.
|
||||
Returns list of chunk dicts with metadata.
|
||||
"""
|
||||
chunks = []
|
||||
# Split on heading boundaries first
|
||||
sections = re.split(r'(?=\n#{1,4} )', text)
|
||||
|
||||
current_heading = ""
|
||||
for section in sections:
|
||||
heading_match = re.match(r'\n(#{1,4} .+)\n', section)
|
||||
if heading_match:
|
||||
current_heading = heading_match.group(1).strip('# ').strip()
|
||||
|
||||
# Further split long sections into overlapping chunks
|
||||
start = 0
|
||||
while start < len(section):
|
||||
end = start + MAX_CHUNK_CHARS
|
||||
chunk_text_val = section[start:end].strip()
|
||||
|
||||
if len(chunk_text_val) < 50:
|
||||
start = end - OVERLAP_CHARS
|
||||
continue
|
||||
|
||||
chunk_id = hashlib.md5(f"{url}:{start}".encode()).hexdigest()
|
||||
|
||||
chunks.append({
|
||||
"id": chunk_id,
|
||||
"text": chunk_text_val,
|
||||
"source": source_name,
|
||||
"url": url,
|
||||
"section": current_heading,
|
||||
"char_offset": start,
|
||||
})
|
||||
start = end - OVERLAP_CHARS
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
# ── Embedding ────────────────────────────────────────────────────────────────
|
||||
|
||||
def embed(texts: list[str]) -> Optional[list[list[float]]]:
|
||||
"""Embed a batch of texts via nomic-embed-text on memory-engine."""
|
||||
embeddings = []
|
||||
for text in texts:
|
||||
result = http_post_json(
|
||||
f"{OLLAMA_URL}/api/embeddings",
|
||||
{"model": EMBED_MODEL, "prompt": text}
|
||||
)
|
||||
if result and "embedding" in result:
|
||||
embeddings.append(result["embedding"])
|
||||
else:
|
||||
print(f" Embedding failed for text: {text[:50]}")
|
||||
embeddings.append(None)
|
||||
return embeddings
|
||||
|
||||
|
||||
# ── Qdrant ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def ensure_collection():
|
||||
"""Create homelab_docs collection if it doesn't exist."""
|
||||
existing = http_get(f"{QDRANT_URL}/collections/{COLLECTION}")
|
||||
if existing:
|
||||
try:
|
||||
data = json.loads(existing)
|
||||
if data.get("status") == "ok":
|
||||
return # already exists
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(f"Creating Qdrant collection '{COLLECTION}'...")
|
||||
http_put_json(f"{QDRANT_URL}/collections/{COLLECTION}", {
|
||||
"vectors": {
|
||||
"size": EMBED_DIM,
|
||||
"distance": "Cosine"
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
def upsert_chunks(chunks: list[dict], embeddings: list[list[float]]):
|
||||
"""Upsert chunks + embeddings to Qdrant."""
|
||||
points = []
|
||||
for chunk, embedding in zip(chunks, embeddings):
|
||||
if embedding is None:
|
||||
continue
|
||||
# Convert hex id to int for Qdrant
|
||||
point_id = int(chunk["id"][:8], 16)
|
||||
points.append({
|
||||
"id": point_id,
|
||||
"vector": embedding,
|
||||
"payload": {
|
||||
"text": chunk["text"],
|
||||
"source": chunk["source"],
|
||||
"url": chunk["url"],
|
||||
"section": chunk["section"],
|
||||
}
|
||||
})
|
||||
|
||||
if not points:
|
||||
return 0
|
||||
|
||||
result = http_put_json(
|
||||
f"{QDRANT_URL}/collections/{COLLECTION}/points?wait=true",
|
||||
{"points": points}
|
||||
)
|
||||
return len(points) if result and result.get("status") == "ok" else 0
|
||||
|
||||
|
||||
# ── State ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def load_state() -> dict:
|
||||
if os.path.exists(STATE_FILE):
|
||||
with open(STATE_FILE) as f:
|
||||
return json.load(f)
|
||||
return {"ingested": {}, "total_chunks": 0}
|
||||
|
||||
|
||||
def save_state(state: dict):
|
||||
with open(STATE_FILE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def is_stale(state: dict, url: str, refresh_days: int) -> bool:
|
||||
entry = state["ingested"].get(url)
|
||||
if not entry:
|
||||
return True
|
||||
last = datetime.fromisoformat(entry["ingested_at"])
|
||||
return datetime.now(timezone.utc) - last > timedelta(days=refresh_days)
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def ingest_url(url: str, source_name: str, state: dict) -> int:
|
||||
"""Fetch, chunk, embed, upsert one URL. Returns chunk count."""
|
||||
print(f" Fetching: {url}")
|
||||
html = http_get(url)
|
||||
if not html:
|
||||
return 0
|
||||
|
||||
text = html_to_text(html)
|
||||
if len(text) < 200:
|
||||
print(f" Too short after cleaning ({len(text)} chars), skipping")
|
||||
return 0
|
||||
|
||||
chunks = chunk_text(text, source_name, url)
|
||||
print(f" {len(chunks)} chunks")
|
||||
|
||||
# Embed in batches of 10
|
||||
all_embeddings = []
|
||||
for i in range(0, len(chunks), 10):
|
||||
batch = chunks[i:i+10]
|
||||
embeddings = embed([c["text"] for c in batch])
|
||||
all_embeddings.extend(embeddings)
|
||||
time.sleep(0.5) # be gentle with memory-engine
|
||||
|
||||
count = upsert_chunks(chunks, all_embeddings)
|
||||
|
||||
state["ingested"][url] = {
|
||||
"source": source_name,
|
||||
"ingested_at": datetime.now(timezone.utc).isoformat(),
|
||||
"chunks": count,
|
||||
}
|
||||
state["total_chunks"] = state.get("total_chunks", 0) + count
|
||||
return count
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Grace RAG ingestion pipeline")
|
||||
parser.add_argument("--hours", type=float, default=DEFAULT_HOURS, help="Max hours to run")
|
||||
parser.add_argument("--url", type=str, help="Ingest a single URL immediately")
|
||||
parser.add_argument("--force", action="store_true", help="Re-ingest all regardless of freshness")
|
||||
parser.add_argument("--status", action="store_true", help="Show ingestion status and exit")
|
||||
args = parser.parse_args()
|
||||
|
||||
state = load_state()
|
||||
|
||||
if args.status:
|
||||
print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}")
|
||||
print(f"URLs ingested: {len(state['ingested'])}")
|
||||
for url, info in sorted(state["ingested"].items(), key=lambda x: x[1]["ingested_at"]):
|
||||
print(f" {info['ingested_at'][:10]} {info['chunks']:4d} chunks {url}")
|
||||
return
|
||||
|
||||
ensure_collection()
|
||||
|
||||
# Single URL mode
|
||||
if args.url:
|
||||
print(f"Ingesting: {args.url}")
|
||||
count = ingest_url(args.url, args.url, state)
|
||||
save_state(state)
|
||||
print(f"Done: {count} chunks ingested")
|
||||
return
|
||||
|
||||
# Load doc list
|
||||
with open(DOCS_FILE) as f:
|
||||
docs = json.load(f)
|
||||
|
||||
deadline = datetime.now(timezone.utc) + timedelta(hours=args.hours)
|
||||
total_new_chunks = 0
|
||||
total_urls = 0
|
||||
|
||||
for source in docs["sources"]:
|
||||
name = source["name"]
|
||||
refresh_days = source.get("refresh_days", 30)
|
||||
|
||||
for url in source["urls"]:
|
||||
if datetime.now(timezone.utc) >= deadline:
|
||||
print(f"\nTime limit ({args.hours}h) reached — stopping. Run again to continue.")
|
||||
break
|
||||
|
||||
if not args.force and not is_stale(state, url, refresh_days):
|
||||
continue
|
||||
|
||||
print(f"\n[{name}]")
|
||||
count = ingest_url(url, name, state)
|
||||
total_new_chunks += count
|
||||
total_urls += 1
|
||||
save_state(state) # save after each URL so progress isn't lost
|
||||
time.sleep(1) # polite crawling
|
||||
|
||||
print(f"\n=== Done: {total_urls} URLs, {total_new_chunks} new chunks ===")
|
||||
print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user