#!/usr/bin/env python3 """ RAG ingestion pipeline for Grace's homelab knowledge base. Fetches documentation pages, chunks them, embeds via nomic-embed-text (memory-engine Ollama), and upserts to Qdrant on DB VM. Usage: python3 ~/scripts/rag-ingest.py # process all pending/stale docs python3 ~/scripts/rag-ingest.py --hours 2 # stop after N hours (default 2) python3 ~/scripts/rag-ingest.py --url URL # ingest a single URL immediately python3 ~/scripts/rag-ingest.py --force # re-ingest all regardless of freshness GRACE DIRECTIVE: When Maxwell sends a URL and says 'read this', 'remember this', 'learn this', 'add this to your docs', or 'add this to the RAG': 1. Add it to ~/scripts/rag-docs.json under the appropriate category 2. Run: python3 ~/scripts/rag-ingest.py --url 3. Confirm it was ingested """ import argparse import hashlib import json import os import re import sys import time import urllib.request import urllib.error from datetime import datetime, timezone, timedelta from typing import Optional # ── Config ─────────────────────────────────────────────────────────────────── DOCS_FILE = os.path.expanduser("~/scripts/rag-docs.json") STATE_FILE = os.path.expanduser("~/self-improving/rag-state.json") QDRANT_URL = "http://localhost:6333" OLLAMA_URL = "http://192.168.20.142:11434" # memory-engine EMBED_MODEL = "nomic-embed-text" COLLECTION = "homelab_docs" EMBED_DIM = 768 # nomic-embed-text output size CHUNK_SIZE = 600 # tokens approx (chars/4) CHUNK_OVERLAP = 60 # ~10% overlap MAX_CHUNK_CHARS = CHUNK_SIZE * 4 OVERLAP_CHARS = CHUNK_OVERLAP * 4 DEFAULT_HOURS = 2 # ── HTTP helpers ───────────────────────────────────────────────────────────── def http_get(url: str, timeout: int = 15) -> Optional[str]: try: req = urllib.request.Request(url, headers={"User-Agent": "Grace-RAG/1.0"}) with urllib.request.urlopen(req, timeout=timeout) as r: return r.read().decode("utf-8", errors="replace") except Exception as e: print(f" Fetch error {url}: {e}") return None def http_post_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]: try: body = json.dumps(data).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json", "User-Agent": "Grace-RAG/1.0"}, method="POST" ) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode()) except Exception as e: print(f" POST error {url}: {e}") return None def http_put_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]: try: body = json.dumps(data).encode() req = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json"}, method="PUT" ) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode()) except Exception as e: print(f" PUT error {url}: {e}") return None # ── HTML → text ────────────────────────────────────────────────────────────── def html_to_text(html: str) -> str: """Strip HTML tags and clean whitespace.""" # Remove scripts and styles html = re.sub(r'<(script|style)[^>]*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) # Remove HTML comments html = re.sub(r'', '', html, flags=re.DOTALL) # Preserve heading structure html = re.sub(r']*>(.*?)', lambda m: f"\n{'#'*int(m.group(1))} {m.group(2)}\n", html, flags=re.DOTALL) # Preserve code blocks html = re.sub(r'<(pre|code)[^>]*>(.*?)', lambda m: f"\n```\n{m.group(2)}\n```\n", html, flags=re.DOTALL) # Convert links to text html = re.sub(r']*>(.*?)', r'\1', html, flags=re.DOTALL) # Strip remaining tags html = re.sub(r'<[^>]+>', ' ', html) # Decode entities html = html.replace('&', '&').replace('<', '<').replace('>', '>') \ .replace(' ', ' ').replace(''', "'").replace('"', '"') # Clean whitespace html = re.sub(r'\n{3,}', '\n\n', html) html = re.sub(r'[ \t]{2,}', ' ', html) return html.strip() # ── Chunking ───────────────────────────────────────────────────────────────── def chunk_text(text: str, source_name: str, url: str) -> list[dict]: """ Split text into overlapping chunks, preserving section headings as context. Returns list of chunk dicts with metadata. """ chunks = [] # Split on heading boundaries first sections = re.split(r'(?=\n#{1,4} )', text) current_heading = "" for section in sections: heading_match = re.match(r'\n(#{1,4} .+)\n', section) if heading_match: current_heading = heading_match.group(1).strip('# ').strip() # Further split long sections into overlapping chunks start = 0 while start < len(section): end = start + MAX_CHUNK_CHARS chunk_text_val = section[start:end].strip() if len(chunk_text_val) < 50: start = end - OVERLAP_CHARS continue chunk_id = hashlib.md5(f"{url}:{start}".encode()).hexdigest() chunks.append({ "id": chunk_id, "text": chunk_text_val, "source": source_name, "url": url, "section": current_heading, "char_offset": start, }) start = end - OVERLAP_CHARS return chunks # ── Embedding ──────────────────────────────────────────────────────────────── def embed(texts: list[str]) -> Optional[list[list[float]]]: """Embed a batch of texts via nomic-embed-text on memory-engine.""" embeddings = [] for text in texts: result = http_post_json( f"{OLLAMA_URL}/api/embeddings", {"model": EMBED_MODEL, "prompt": text} ) if result and "embedding" in result: embeddings.append(result["embedding"]) else: print(f" Embedding failed for text: {text[:50]}") embeddings.append(None) return embeddings # ── Qdrant ─────────────────────────────────────────────────────────────────── def ensure_collection(): """Create homelab_docs collection if it doesn't exist.""" existing = http_get(f"{QDRANT_URL}/collections/{COLLECTION}") if existing: try: data = json.loads(existing) if data.get("status") == "ok": return # already exists except Exception: pass print(f"Creating Qdrant collection '{COLLECTION}'...") http_put_json(f"{QDRANT_URL}/collections/{COLLECTION}", { "vectors": { "size": EMBED_DIM, "distance": "Cosine" } }) def upsert_chunks(chunks: list[dict], embeddings: list[list[float]]): """Upsert chunks + embeddings to Qdrant.""" points = [] for chunk, embedding in zip(chunks, embeddings): if embedding is None: continue # Convert hex id to int for Qdrant point_id = int(chunk["id"][:8], 16) points.append({ "id": point_id, "vector": embedding, "payload": { "text": chunk["text"], "source": chunk["source"], "url": chunk["url"], "section": chunk["section"], } }) if not points: return 0 result = http_put_json( f"{QDRANT_URL}/collections/{COLLECTION}/points?wait=true", {"points": points} ) return len(points) if result and result.get("status") == "ok" else 0 # ── State ──────────────────────────────────────────────────────────────────── def load_state() -> dict: if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: return json.load(f) return {"ingested": {}, "total_chunks": 0} def save_state(state: dict): with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def is_stale(state: dict, url: str, refresh_days: int) -> bool: entry = state["ingested"].get(url) if not entry: return True last = datetime.fromisoformat(entry["ingested_at"]) return datetime.now(timezone.utc) - last > timedelta(days=refresh_days) # ── Main ───────────────────────────────────────────────────────────────────── def ingest_url(url: str, source_name: str, state: dict) -> int: """Fetch, chunk, embed, upsert one URL. Returns chunk count.""" print(f" Fetching: {url}") html = http_get(url) if not html: return 0 text = html_to_text(html) if len(text) < 200: print(f" Too short after cleaning ({len(text)} chars), skipping") return 0 chunks = chunk_text(text, source_name, url) print(f" {len(chunks)} chunks") # Embed in batches of 10 all_embeddings = [] for i in range(0, len(chunks), 10): batch = chunks[i:i+10] embeddings = embed([c["text"] for c in batch]) all_embeddings.extend(embeddings) time.sleep(0.5) # be gentle with memory-engine count = upsert_chunks(chunks, all_embeddings) state["ingested"][url] = { "source": source_name, "ingested_at": datetime.now(timezone.utc).isoformat(), "chunks": count, } state["total_chunks"] = state.get("total_chunks", 0) + count return count def main(): parser = argparse.ArgumentParser(description="Grace RAG ingestion pipeline") parser.add_argument("--hours", type=float, default=DEFAULT_HOURS, help="Max hours to run") parser.add_argument("--url", type=str, help="Ingest a single URL immediately") parser.add_argument("--force", action="store_true", help="Re-ingest all regardless of freshness") parser.add_argument("--status", action="store_true", help="Show ingestion status and exit") args = parser.parse_args() state = load_state() if args.status: print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}") print(f"URLs ingested: {len(state['ingested'])}") for url, info in sorted(state["ingested"].items(), key=lambda x: x[1]["ingested_at"]): print(f" {info['ingested_at'][:10]} {info['chunks']:4d} chunks {url}") return ensure_collection() # Single URL mode if args.url: print(f"Ingesting: {args.url}") count = ingest_url(args.url, args.url, state) save_state(state) print(f"Done: {count} chunks ingested") return # Load doc list with open(DOCS_FILE) as f: docs = json.load(f) deadline = datetime.now(timezone.utc) + timedelta(hours=args.hours) total_new_chunks = 0 total_urls = 0 for source in docs["sources"]: name = source["name"] refresh_days = source.get("refresh_days", 30) for url in source["urls"]: if datetime.now(timezone.utc) >= deadline: print(f"\nTime limit ({args.hours}h) reached — stopping. Run again to continue.") break if not args.force and not is_stale(state, url, refresh_days): continue print(f"\n[{name}]") count = ingest_url(url, name, state) total_new_chunks += count total_urls += 1 save_state(state) # save after each URL so progress isn't lost time.sleep(1) # polite crawling print(f"\n=== Done: {total_urls} URLs, {total_new_chunks} new chunks ===") print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}") if __name__ == "__main__": main()