From 014ec8bd5c9bea7410b571834e8f898a84747de0 Mon Sep 17 00:00:00 2001 From: Grace Date: Mon, 16 Mar 2026 22:32:48 -0700 Subject: [PATCH] feat: initial import of all helper scripts from ~/scripts/ - Training data pipeline: convert, export, extract, load-to-db - Infra tooling: infra-audit, infra-gitea-link - RAG pipeline: rag-ingest, rag-query - Fine-tuning: finetune-lora, overnight-qwen3, install-unsloth - Transcripts: export-transcripts - Updated README with script index and token reduction strategy --- README.md | 35 ++- convert-training-data.py | 240 ++++++++++++++++++ export-training-data.py | 180 ++++++++++++++ export-transcripts.py | 237 ++++++++++++++++++ extract-sessions.py | 270 +++++++++++++++++++++ finetune-lora.py | 252 +++++++++++++++++++ infra-audit.py | 495 ++++++++++++++++++++++++++++++++++++++ infra-gitea-link.py | 319 ++++++++++++++++++++++++ install-unsloth.sh | 56 +++++ load-transcripts-to-db.py | 139 +++++++++++ overnight-qwen3.py | 144 +++++++++++ overnight-research.sh | 64 +++++ rag-docs.json | 111 +++++++++ rag-ingest.py | 343 ++++++++++++++++++++++++++ rag-query.py | 95 ++++++++ 15 files changed, 2979 insertions(+), 1 deletion(-) create mode 100755 convert-training-data.py create mode 100755 export-training-data.py create mode 100755 export-transcripts.py create mode 100644 extract-sessions.py create mode 100755 finetune-lora.py create mode 100755 infra-audit.py create mode 100755 infra-gitea-link.py create mode 100755 install-unsloth.sh create mode 100644 load-transcripts-to-db.py create mode 100755 overnight-qwen3.py create mode 100755 overnight-research.sh create mode 100644 rag-docs.json create mode 100755 rag-ingest.py create mode 100755 rag-query.py diff --git a/README.md b/README.md index 4fcb76b..87d5291 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,36 @@ # grace-scripts -Grace AI helper scripts — token-efficient homelab automation \ No newline at end of file +Grace AI helper scripts — token-efficient homelab automation. + +These scripts replace inline shell construction in AI sessions, reducing token usage and making repeated tasks reproducible and auditable. + +## Scripts + +| Script | Purpose | +|--------|---------| +| `convert-training-data.py` | Convert raw session exports to unsloth/axolotl-ready JSONL | +| `export-training-data.py` | Export training data from OpenClaw sessions | +| `export-transcripts.py` | Export session transcripts incrementally | +| `extract-sessions.py` | Extract and parse session data | +| `finetune-lora.py` | Run LoRA fine-tuning on local Qwen3-8B | +| `infra-audit.py` | Audit homelab infra state and sync to MongoDB | +| `infra-gitea-link.py` | Link infra components to Gitea issues/repos | +| `load-transcripts-to-db.py` | Sync transcripts to PostgreSQL on DB VM | +| `overnight-qwen3.py` | Overnight batch inference job | +| `overnight-research.sh` | Overnight research and summarization | +| `rag-ingest.py` | Ingest documents into Qdrant for RAG | +| `rag-query.py` | Query Qdrant RAG index | +| `install-unsloth.sh` | Install unsloth fine-tuning framework | + +## Credential Protocol + +All scripts pull credentials from Infisical. Never hardcode secrets. +IPs and service URLs reference TOOLS.md / BOOTSTRAP.md in the Grace workspace. + +## Token Reduction Strategy + +Each script replaces multi-step inline shell that Grace would otherwise construct token-by-token in every session. One `exec` call to a script = ~10-50x fewer tokens than building the equivalent inline. + +## Issues & Improvements + +Use Gitea issues on this repo to track bugs, improvements, and new script requests. diff --git a/convert-training-data.py b/convert-training-data.py new file mode 100755 index 0000000..e177ffe --- /dev/null +++ b/convert-training-data.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Convert raw Grace session exports to clean unsloth/axolotl-ready JSONL. + +Input: ~/training-data/jsonl/grace_training_*.jsonl (ShareGPT format, raw) +Output: ~/training-data/cleaned/grace_clean_YYYYMMDD.jsonl (clean ShareGPT) + ~/training-data/dpo/grace_dpo_YYYYMMDD.jsonl (DPO pairs, if any) + +Storage: copies to grace@192.168.20.87:~/training-data/cleaned/ +""" + +import json +import os +import re +import glob +import subprocess +from datetime import datetime, timezone + +NFS_BASE = "/mnt/ai-storage/grace/training-data" +INPUT_DIR = os.path.join(NFS_BASE, "jsonl") +CLEAN_DIR = os.path.join(NFS_BASE, "cleaned") +DPO_DIR = os.path.join(NFS_BASE, "dpo") +STATE_FILE = os.path.expanduser("~/self-improving/convert-state.json") + +# Noise patterns to strip from user turns +NOISE_PATTERNS = [ + re.compile(r'Conversation info \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'Sender \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'.*?', re.DOTALL), + re.compile(r'\[media attached:.*?\]'), + re.compile(r'```json\s*\{.*?"schema".*?\}\s*```', re.DOTALL), +] + +# Minimum quality thresholds +MIN_TURNS = 2 # minimum user/assistant exchanges +MIN_ASSISTANT_CHARS = 80 # skip very short assistant replies +MAX_ASSISTANT_CHARS = 8000 # skip extremely long tool-dump responses + +# Strings that indicate a low-quality assistant turn +BAD_PATTERNS = [ + "HEARTBEAT_OK", "NO_REPLY", + "", # raw tool call leaked into response + "Let me check", # placeholder with no follow-through +] + +SYSTEM_PROMPT = ( + "You are Grace, a Culture Mind-class AI assistant and trusted companion for Maxwell. " + "You are warm, direct, witty, and proactive. You support Maxwell's ADHD executive function, " + "manage his homelab, help with job searching, and operate local AI infrastructure. " + "You speak plainly — no corporate pleasantries, no hedging. " + "You use exec and local tools proactively and return real results. Never fabricate output." +) + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"converted_files": [], "total_clean": 0, "total_dpo": 0} + + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def clean_text(text: str) -> str: + """Strip metadata noise from a user turn.""" + for pattern in NOISE_PATTERNS: + text = pattern.sub("", text) + # Collapse excess whitespace + text = re.sub(r'\n{3,}', '\n\n', text).strip() + return text + + +def is_good_assistant_turn(text: str) -> bool: + if len(text) < MIN_ASSISTANT_CHARS: + return False + if len(text) > MAX_ASSISTANT_CHARS: + return False + for bad in BAD_PATTERNS: + if bad in text: + return False + return True + + +def extract_dpo_pairs(conversations: list) -> list: + """ + Look for correction signals in the conversation. + A correction pair = assistant turn followed by a user message + that signals disagreement, then a better assistant turn. + Signals: "no,", "actually", "that's wrong", "you should", "remember" + """ + correction_signals = ["no,", "actually", "that's wrong", "you should have", + "remember that", "i told you", "stop doing", "wrong"] + pairs = [] + convs = [c for c in conversations if c["from"] in ("human", "gpt")] + + for i in range(1, len(convs) - 1): + if convs[i]["from"] != "human": + continue + user_text = convs[i]["value"].lower() + if not any(sig in user_text for sig in correction_signals): + continue + if i < 1 or convs[i-1]["from"] != "gpt": + continue + if i + 1 >= len(convs) or convs[i+1]["from"] != "gpt": + continue + + rejected = convs[i-1]["value"] + chosen = convs[i+1]["value"] + + # Only worth it if the chosen is meaningfully different + if len(chosen) > 50 and chosen != rejected: + pairs.append({ + "prompt": convs[i-2]["value"] if i >= 2 else "", + "chosen": chosen, + "rejected": rejected, + "context": convs[i]["value"], # the correction itself + }) + return pairs + + +def process_file(path: str): + """ + Process one raw export JSONL file. + Returns (clean_examples, dpo_pairs). + """ + clean_examples = [] + dpo_pairs = [] + + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + example = json.loads(line) + except json.JSONDecodeError: + continue + + raw_convs = example.get("conversations", []) + + # Separate system prompt from turns + turns = [c for c in raw_convs if c.get("from") != "system"] + + # Clean user turns + cleaned = [] + for turn in turns: + if turn["from"] == "human": + text = clean_text(turn["value"]) + if len(text) < 5: + continue + cleaned.append({"from": "human", "value": text}) + elif turn["from"] == "gpt": + text = turn["value"].strip() + if not is_good_assistant_turn(text): + continue + cleaned.append({"from": "gpt", "value": text}) + + # Build valid human/gpt pairs only + valid_turns = [] + i = 0 + while i < len(cleaned) - 1: + if cleaned[i]["from"] == "human" and cleaned[i+1]["from"] == "gpt": + valid_turns.append(cleaned[i]) + valid_turns.append(cleaned[i+1]) + i += 2 + else: + i += 1 + + if len(valid_turns) < MIN_TURNS * 2: + continue + + clean_examples.append({ + "conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + valid_turns, + "source": os.path.basename(path), + "converted_at": datetime.now(timezone.utc).isoformat(), + }) + + # Extract DPO pairs from this example + dpo_pairs.extend(extract_dpo_pairs(valid_turns)) + + return clean_examples, dpo_pairs + + +def scp_file(local_path: str, remote_subdir: str): + # Data is on NFS — already written directly, no SCP needed + print(f" Written to NFS: {local_path}") + + +def main(): + os.makedirs(CLEAN_DIR, exist_ok=True) + os.makedirs(DPO_DIR, exist_ok=True) + + state = load_state() + input_files = sorted(glob.glob(os.path.join(INPUT_DIR, "*.jsonl"))) + new_files = [f for f in input_files if os.path.basename(f) not in state["converted_files"]] + + if not new_files: + print("No new files to convert.") + return + + all_clean = [] + all_dpo = [] + + for f in new_files: + print(f"Processing: {os.path.basename(f)}") + clean, dpo = process_file(f) + print(f" → {len(clean)} clean examples, {len(dpo)} DPO pairs") + all_clean.extend(clean) + all_dpo.extend(dpo) + state["converted_files"].append(os.path.basename(f)) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + + if all_clean: + clean_file = os.path.join(CLEAN_DIR, f"grace_clean_{timestamp}.jsonl") + with open(clean_file, "w") as f: + for ex in all_clean: + f.write(json.dumps(ex) + "\n") + print(f"\nWrote {len(all_clean)} clean examples → {clean_file}") + scp_file(clean_file, "cleaned") + state["total_clean"] = state.get("total_clean", 0) + len(all_clean) + + if all_dpo: + dpo_file = os.path.join(DPO_DIR, f"grace_dpo_{timestamp}.jsonl") + with open(dpo_file, "w") as f: + for pair in all_dpo: + f.write(json.dumps(pair) + "\n") + print(f"Wrote {len(all_dpo)} DPO pairs → {dpo_file}") + scp_file(dpo_file, "dpo") + state["total_dpo"] = state.get("total_dpo", 0) + len(all_dpo) + + save_state(state) + print(f"\nTotals to date: {state['total_clean']} clean, {state['total_dpo']} DPO pairs") + + +if __name__ == "__main__": + main() diff --git a/export-training-data.py b/export-training-data.py new file mode 100755 index 0000000..6780a45 --- /dev/null +++ b/export-training-data.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Export OpenClaw session JSONL files to training data format. +Uses session-logs skill structure to read raw sessions. +Outputs ShareGPT format JSONL for axolotl/unsloth fine-tuning. + +Storage: grace@192.168.20.87:~/training-data/jsonl/ +""" + +import json +import os +import glob +import subprocess +from datetime import datetime + +SESSIONS_DIR = os.path.expanduser("~/.openclaw/agents/main/sessions/") +OUTPUT_DIR = "/mnt/ai-storage/grace/training-data/jsonl" +STATE_FILE = os.path.expanduser("~/self-improving/export-state.json") + +SYSTEM_PROMPT = """You are Grace, a Culture Mind-class AI assistant and cognitive partner for Maxwell. You are warm, direct, witty, and proactive. You support Maxwell's ADHD executive function, manage his homelab, track job search tasks, and operate local AI infrastructure. You speak plainly — no corporate pleasantries, no hedging. You use tools proactively and report real results.""" + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"exported_sessions": [], "last_run": None, "total_examples": 0} + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + +def extract_text_from_content(content): + """ + Extract displayable text from a content block list. + Handles: text blocks, toolResult blocks (for context), ignores thinking/toolCall. + Returns the final visible text only. + """ + text_parts = [] + for block in content: + if not isinstance(block, dict): + if isinstance(block, str): + text_parts.append(block) + continue + btype = block.get("type", "") + if btype == "text": + text_parts.append(block.get("text", "")) + # Skip: thinking, toolCall, toolResult — these are internal plumbing + return " ".join(text_parts).strip() + + +def extract_conversations(jsonl_path): + """ + Extract clean user/assistant turn pairs from a session JSONL file. + + Strategy: collect all messages in order. For each user turn with text, + look ahead to find the next assistant turn that has a text block + (the final reply after any tool calls). Pair them. + """ + messages = [] + + try: + with open(jsonl_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except: + continue + + if msg.get("type") != "message": + continue + + role = msg.get("message", {}).get("role") + content = msg.get("message", {}).get("content", []) + if not role or not content: + continue + + text = extract_text_from_content(content) + messages.append({"role": role, "text": text}) + + except Exception as e: + print(f" Error reading {jsonl_path}: {e}") + return [] + + # Build user→assistant pairs + conversations = [] + i = 0 + while i < len(messages): + msg = messages[i] + + # Find a user turn with real text + if msg["role"] == "user": + user_text = msg["text"] + skip_phrases = ("HEARTBEAT_OK", "NO_REPLY", "Read HEARTBEAT.md", + "A new session was started", "[Queued messages") + if len(user_text) < 15 or any(user_text.startswith(p) for p in skip_phrases): + i += 1 + continue + + # Look ahead for the next assistant turn with text + j = i + 1 + while j < len(messages): + if messages[j]["role"] == "assistant" and len(messages[j]["text"]) > 30: + asst_text = messages[j]["text"] + bad = ("HEARTBEAT_OK", "NO_REPLY") + if not any(asst_text.startswith(b) for b in bad): + conversations.append({"from": "human", "value": user_text}) + conversations.append({"from": "gpt", "value": asst_text}) + i = j + 1 + break + j += 1 + else: + i += 1 + else: + i += 1 + + return conversations + +def session_to_example(jsonl_path): + convs = extract_conversations(jsonl_path) + if len(convs) < 2: + return None + return { + "conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + convs, + "source": os.path.basename(jsonl_path), + "exported_at": datetime.utcnow().isoformat() + } + +def main(): + state = load_state() + os.makedirs(OUTPUT_DIR, exist_ok=True) + + session_files = sorted([ + f for f in glob.glob(os.path.join(SESSIONS_DIR, "*.jsonl")) + if ".reset." not in f + ]) + + new_examples = [] + new_sessions = [] + + for sf in session_files: + sid = os.path.basename(sf).replace(".jsonl", "") + if sid in state["exported_sessions"]: + continue + + print(f"Processing: {sid}") + example = session_to_example(sf) + + if example and len(example["conversations"]) >= 4: + turns = (len(example["conversations"]) - 1) // 2 + print(f" → {turns} turns") + new_examples.append(example) + new_sessions.append(sid) + else: + print(f" → skipped (too short)") + + if not new_examples: + print("No new sessions to export.") + return + + os.makedirs(OUTPUT_DIR, exist_ok=True) + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + output_file = os.path.join(OUTPUT_DIR, f"grace_training_{timestamp}.jsonl") + + with open(output_file, "w") as f: + for ex in new_examples: + f.write(json.dumps(ex) + "\n") + + state["exported_sessions"].extend(new_sessions) + state["last_run"] = datetime.utcnow().isoformat() + state["total_examples"] = state.get("total_examples", 0) + len(new_examples) + save_state(state) + + print(f"\nWrote {len(new_examples)} examples → {output_file}") + print(f"Total examples to date: {state['total_examples']}") + +if __name__ == "__main__": + main() diff --git a/export-transcripts.py b/export-transcripts.py new file mode 100755 index 0000000..78abf70 --- /dev/null +++ b/export-transcripts.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +""" +Verbatim transcript exporter for Grace training data. + +Reads all OpenClaw session JSONL files and saves clean, full-conversation +transcripts to NFS. Strips metadata envelopes from user messages but +preserves all content verbatim. + +Output format: ShareGPT JSONL (system + alternating human/gpt turns) +Storage: /mnt/ai-storage/grace/training-data/transcripts/ +""" + +import json +import os +import re +import glob +from datetime import datetime, timezone + +SESSIONS_DIR = os.path.expanduser("~/.openclaw/agents/main/sessions/") +OUTPUT_DIR = "/mnt/ai-storage/grace/training-data/transcripts" +STATE_FILE = os.path.expanduser("~/self-improving/transcript-state.json") + +SYSTEM_PROMPT = ( + "You are Grace, a Culture Mind-class AI assistant and trusted companion for Maxwell Burton. " + "You are warm, direct, witty, and proactive. You support Maxwell's ADHD executive function, " + "manage his homelab, help with job searching, and operate local AI infrastructure. " + "You speak plainly — no corporate pleasantries, no hedging. " + "You use exec and local tools proactively and return real results. Never fabricate output." +) + +# Noise to strip from user turns (metadata envelopes, not content) +NOISE_PATTERNS = [ + re.compile(r'Conversation info \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'Sender \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'.*?\s*\n?', re.DOTALL), + re.compile(r'\[media attached:.*?\]\s*\n?'), + re.compile(r'To send an image back, prefer the message tool.*?\n', re.DOTALL), + re.compile(r'```json\s*\{\s*"schema"\s*:.*?\}\s*```\s*\n?', re.DOTALL), + re.compile(r'Replied message \(untrusted.*?\}\s*\n\s*\}\s*\n', re.DOTALL), + re.compile(r'\[Queued messages while agent was busy\]\s*\n', re.DOTALL), + re.compile(r'---\s*\nQueued #\d+\s*\n'), + re.compile(r'^```\s*\n```\s*\n?', re.MULTILINE), +] + +# Full turns to skip entirely +SKIP_EXACT = {"HEARTBEAT_OK", "NO_REPLY"} +SKIP_STARTSWITH = ( + "A new session was started via /new or /reset.", + "Read HEARTBEAT.md if it exists", + "Run your Session Startup", +) + +# Skip assistant turns that are just internal narration with no value +SKIP_ASST_EXACT = {"HEARTBEAT_OK", "NO_REPLY"} +SKIP_ASST_STARTSWITH = ( + "✅ New session started", +) + + +def clean_user(text: str) -> str: + for p in NOISE_PATTERNS: + text = p.sub("", text) + text = re.sub(r'\n{3,}', '\n\n', text) + text = re.sub(r'^[\s`]+', '', text) + return text.strip() + + +def get_text(content: list) -> str: + parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + t = block.get("text", "").strip() + if t: + parts.append(t) + elif isinstance(block, str) and block.strip(): + parts.append(block.strip()) + return "\n".join(parts).strip() + + +def is_tool_result_msg(content: list) -> bool: + """True if this user message is a tool result, not a human turn.""" + return any( + isinstance(b, dict) and b.get("type") in ("toolResult", "tool_result") + for b in content + ) + + +def extract_transcript(path: str) -> list: + """ + Extract full verbatim conversation as list of {from, value} dicts. + Preserves all turns — doesn't filter by quality or length. + Only removes metadata noise and skips tool result messages. + """ + turns = [] + + try: + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except Exception: + continue + + if msg.get("type") != "message": + continue + + role = msg.get("message", {}).get("role") + content = msg.get("message", {}).get("content", []) + + if not role or not content: + continue + + if role == "user": + # Skip tool result messages + if is_tool_result_msg(content): + continue + + text = clean_user(get_text(content)) + + if not text: + continue + if text in SKIP_EXACT: + continue + if any(text.startswith(s) for s in SKIP_STARTSWITH): + continue + + turns.append({"from": "human", "value": text}) + + elif role == "assistant": + text = get_text(content) + + if not text: + continue + if text in SKIP_ASST_EXACT: + continue + if any(text.startswith(s) for s in SKIP_ASST_STARTSWITH): + continue + + turns.append({"from": "gpt", "value": text}) + + except Exception as e: + print(f" Error reading {path}: {e}") + return [] + + # Ensure turns alternate properly (drop consecutive same-role turns, + # keeping the last assistant turn before a role switch) + clean = [] + for turn in turns: + if clean and clean[-1]["from"] == turn["from"]: + if turn["from"] == "gpt": + # Replace with the later (more complete) assistant turn + clean[-1] = turn + # For consecutive user turns, keep both (queued messages) + else: + clean.append(turn) + else: + clean.append(turn) + + return clean + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"exported_sessions": [], "last_run": None, "total_turns": 0} + + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def main(): + os.makedirs(OUTPUT_DIR, exist_ok=True) + state = load_state() + + session_files = sorted([ + f for f in glob.glob(os.path.join(SESSIONS_DIR, "*.jsonl")) + if ".reset." not in f + ]) + + new_examples = [] + new_sessions = [] + total_new_turns = 0 + + for sf in session_files: + sid = os.path.basename(sf).replace(".jsonl", "") + if sid in state["exported_sessions"]: + continue + + turns = extract_transcript(sf) + + # Need at least 4 turns (2 exchanges) to be useful + if len(turns) < 4: + print(f" {sid[:8]}: skipped ({len(turns)} turns)") + state["exported_sessions"].append(sid) + continue + + example = { + "conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + turns, + "source": os.path.basename(sf), + "turn_count": len(turns), + "exported_at": datetime.now(timezone.utc).isoformat(), + } + new_examples.append(example) + new_sessions.append(sid) + total_new_turns += len(turns) + print(f" {sid[:8]}: {len(turns)} turns ✓") + + if not new_examples: + print("No new sessions to export.") + state["exported_sessions"].extend(new_sessions) + save_state(state) + return + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + output_file = os.path.join(OUTPUT_DIR, f"grace_transcript_{timestamp}.jsonl") + + with open(output_file, "w") as f: + for ex in new_examples: + f.write(json.dumps(ex) + "\n") + + state["exported_sessions"].extend(new_sessions) + state["last_run"] = datetime.now(timezone.utc).isoformat() + state["total_turns"] = state.get("total_turns", 0) + total_new_turns + save_state(state) + + print(f"\nWrote {len(new_examples)} transcripts ({total_new_turns} turns) → {output_file}") + print(f"Total turns to date: {state['total_turns']}") + + +if __name__ == "__main__": + main() diff --git a/extract-sessions.py b/extract-sessions.py new file mode 100644 index 0000000..7148aaa --- /dev/null +++ b/extract-sessions.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +""" +Improved session extractor that handles tool-interleaved conversations. + +Session structure in OpenClaw JSONL: + user: question (text) + assistant: narration + toolCall blocks <- intermediate, not training data + user: toolResult blocks <- intermediate + assistant: final answer (text) <- THIS is what we want + +Strategy: for each user text turn, find the LAST assistant text turn +before the next user text turn. That's the real response. +""" + +import json +import os +import re +import glob +from datetime import datetime, timezone + +# Noise patterns to strip from raw user messages +NOISE_PATTERNS = [ + re.compile(r'Conversation info \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'Sender \(untrusted metadata\):.*?^\s*\}\s*\n', re.DOTALL | re.MULTILINE), + re.compile(r'.*?', re.DOTALL), + re.compile(r'\[media attached:.*?\]'), + re.compile(r'```json\s*\{.*?"schema".*?\}\s*```', re.DOTALL), + re.compile(r'Replied message \(untrusted.*?\}\s*\n\s*\}\s*\n', re.DOTALL), + re.compile(r'\[Queued messages while agent was busy\].*?(?=\n\n|\Z)', re.DOTALL), + re.compile(r'---\s*\nQueued #\d+\s*\n'), + re.compile(r'^```\s*\n```\s*\n', re.MULTILINE), # empty code blocks +] + + +def clean_user_text(text: str) -> str: + for pattern in NOISE_PATTERNS: + text = pattern.sub("", text) + text = re.sub(r'\n{3,}', '\n\n', text).strip() + # Strip leading/trailing backtick remnants + text = re.sub(r'^[\s`]+', '', text).strip() + return text + +SESSIONS_DIR = os.path.expanduser("~/.openclaw/agents/main/sessions/") +OUTPUT_DIR = "/mnt/ai-storage/grace/training-data/jsonl" +STATE_FILE = os.path.expanduser("~/self-improving/export-state.json") + +SYSTEM_PROMPT = ( + "You are Grace, a Culture Mind-class AI assistant and trusted companion for Maxwell. " + "You are warm, direct, witty, and proactive. You support Maxwell's ADHD executive function, " + "manage his homelab, help with job searching, and operate local AI infrastructure. " + "You speak plainly — no corporate pleasantries, no hedging. " + "You use exec and local tools proactively and return real results. Never fabricate output." +) + +SKIP_USER = ( + "HEARTBEAT_OK", "NO_REPLY", "Read HEARTBEAT.md", + "A new session was started", "[Queued messages", + "Run your Session Startup", "Conversation info (untrusted", +) + +SKIP_ASSISTANT = ( + "HEARTBEAT_OK", "NO_REPLY", + "✅ New session started", +) + +MIN_USER_CHARS = 15 +MIN_ASST_CHARS = 40 + + +def get_text(content: list) -> str: + parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + elif isinstance(block, str): + parts.append(block) + return " ".join(parts).strip() + + +def has_tool_result(content: list) -> bool: + """Check if this message is a tool result (not a real user message).""" + for b in content: + if isinstance(b, dict): + btype = b.get("type", "") + if btype in ("toolResult", "tool_result"): + return True + # Also catches the pattern where the whole content is a tool result object + if btype == "text": + text = b.get("text", "") + # Tool results often start with JSON or exec output + if text.startswith('{\n "url":') or text.startswith('{\n "error":'): + return True + return False + + +def is_tool_result_message(content: list) -> bool: + """ + Detect tool result messages: user messages that are purely tool output, + not real human input. These have no real text — just raw tool return values. + Heuristic: if content has text but it looks like structured tool output. + """ + raw = get_text(content) + # Pure tool result patterns + if raw.startswith('{\n "') and '"url"' in raw[:50]: + return True + if raw.startswith("---\nname:") or raw.startswith("# ") and len(raw) > 500: + return True + # Multi-line with no conversational content (exec output etc.) + lines = raw.strip().split('\n') + if len(lines) > 3 and not any(c.isalpha() and c.islower() for c in raw[:30]): + return True + return False + + +def extract_pairs(path: str): + """ + Parse session, return list of (user_text, assistant_text) pairs. + Each pair = real human question + final assistant answer (after tool use). + """ + messages = [] + try: + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except Exception: + continue + if msg.get("type") != "message": + continue + role = msg.get("message", {}).get("role") + content = msg.get("message", {}).get("content", []) + if role and content: + messages.append({"role": role, "content": content}) + except Exception as e: + print(f" Error: {e}") + return [] + + # Group into segments: each starts with a user text turn + # A "user text turn" is a user message with actual text (not just toolResults) + pairs = [] + i = 0 + while i < len(messages): + msg = messages[i] + + # Find a user turn with real text + if msg["role"] != "user": + i += 1 + continue + + user_text = get_text(msg["content"]) + is_tool_result = has_tool_result(msg["content"]) + + if is_tool_result: + i += 1 + continue + + # Clean metadata noise from user text + user_text = clean_user_text(user_text) + + if len(user_text) < MIN_USER_CHARS: + i += 1 + continue + + if any(user_text.startswith(s) for s in SKIP_USER): + i += 1 + continue + + # Look forward: collect all assistant text turns until the next real user turn + # The LAST non-empty assistant text before the next real user = the answer + j = i + 1 + last_asst_text = None + next_real_user = None + + while j < len(messages): + m = messages[j] + if m["role"] == "user": + u_text = get_text(m["content"]) + u_is_tool = has_tool_result(m["content"]) + if not u_is_tool and len(u_text) >= MIN_USER_CHARS and not any(u_text.startswith(s) for s in SKIP_USER): + next_real_user = j + break + elif m["role"] == "assistant": + t = get_text(m["content"]) + if len(t) >= MIN_ASST_CHARS and not any(t.startswith(s) for s in SKIP_ASSISTANT): + last_asst_text = t + j += 1 + + if last_asst_text: + pairs.append((user_text, last_asst_text)) + + # Jump to next real user turn + i = next_real_user if next_real_user is not None else j + + return pairs + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"exported_sessions": [], "last_run": None, "total_examples": 0} + + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def main(): + state = load_state() + os.makedirs(OUTPUT_DIR, exist_ok=True) + + session_files = sorted([ + f for f in glob.glob(os.path.join(SESSIONS_DIR, "*.jsonl")) + if ".reset." not in f + ]) + + new_examples = [] + new_sessions = [] + + for sf in session_files: + sid = os.path.basename(sf).replace(".jsonl", "") + if sid in state["exported_sessions"]: + continue + + pairs = extract_pairs(sf) + if len(pairs) < 2: + print(f" {sid[:8]}: skipped ({len(pairs)} pairs)") + state["exported_sessions"].append(sid) + continue + + conversations = [{"from": "system", "value": SYSTEM_PROMPT}] + for user_text, asst_text in pairs: + conversations.append({"from": "human", "value": user_text}) + conversations.append({"from": "gpt", "value": asst_text}) + + new_examples.append({ + "conversations": conversations, + "source": os.path.basename(sf), + "exported_at": datetime.now(timezone.utc).isoformat(), + }) + new_sessions.append(sid) + print(f" {sid[:8]}: {len(pairs)} pairs ✓") + + if not new_examples: + print("No new sessions to export.") + state["exported_sessions"].extend(new_sessions) + save_state(state) + return + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + output_file = os.path.join(OUTPUT_DIR, f"grace_training_{timestamp}.jsonl") + with open(output_file, "w") as f: + for ex in new_examples: + f.write(json.dumps(ex) + "\n") + + state["exported_sessions"].extend(new_sessions) + state["last_run"] = datetime.now(timezone.utc).isoformat() + state["total_examples"] = state.get("total_examples", 0) + len(new_examples) + save_state(state) + + print(f"\nWrote {len(new_examples)} examples → {output_file}") + print(f"Total examples to date: {state['total_examples']}") + + +if __name__ == "__main__": + main() diff --git a/finetune-lora.py b/finetune-lora.py new file mode 100755 index 0000000..6f4c5ef --- /dev/null +++ b/finetune-lora.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +""" +Grace LoRA fine-tuning script using Unsloth. +Model: Qwen3-8B (loaded from local GGUF or HuggingFace) +GPU: GPU 1 (GTX 1080, 8GB VRAM) — GPU 0 reserved for live inference +Data: ~/training-data/cleaned/ (SFT) + ~/training-data/dpo/ (DPO preference pairs) + +Usage: + source ~/unsloth-env/bin/activate + python3 ~/scripts/finetune-lora.py [--dpo] [--dry-run] + +Output: + ~/models/grace-lora-YYYYMMDD/ (LoRA adapter) + Copied to grace@192.168.20.87:~/models/ + +IMPORTANT: Do not run this until we have 200+ clean examples. +Current count is tracked in ~/self-improving/convert-state.json +""" + +import argparse +import glob +import json +import os +import shutil +import subprocess +from datetime import datetime, timezone + +# ── Config ────────────────────────────────────────────────────────────────── +NFS_BASE = "/mnt/ai-storage/grace" +CLEAN_DIR = os.path.join(NFS_BASE, "training-data/cleaned") +DPO_DIR = os.path.join(NFS_BASE, "training-data/dpo") +OUTPUT_BASE = os.path.join(NFS_BASE, "models") + +# Use GPU 1 only — GPU 0 is running inference +os.environ["CUDA_VISIBLE_DEVICES"] = "1" + +# LoRA hyperparameters (safe defaults for Qwen3-8B on 8GB VRAM) +LORA_CONFIG = { + "r": 16, + "lora_alpha": 16, + "lora_dropout": 0.1, + "target_modules": ["q_proj", "k_proj", "v_proj", "o_proj", + "gate_proj", "up_proj", "down_proj"], +} + +# SFT training config +SFT_CONFIG = { + "learning_rate": 1e-4, + "num_train_epochs": 1, + "per_device_train_batch_size": 2, + "gradient_accumulation_steps": 4, + "max_seq_length": 8192, + "kl_coef": 0.05, # KL anchor — keeps adapter close to base + "warmup_ratio": 0.05, + "lr_scheduler_type": "cosine", + "fp16": False, + "bf16": True, # GTX 1080 doesn't support bf16 natively — falls back to fp16 +} + +# DPO config +DPO_CONFIG = { + "beta": 0.1, # start conservative; sweep upward if needed + "learning_rate": 2e-5, + "num_train_epochs": 1, + "per_device_train_batch_size": 1, + "gradient_accumulation_steps": 8, + "max_length": 4096, +} + +MIN_EXAMPLES_SFT = 200 +MIN_EXAMPLES_DPO = 50 + + +def count_examples(directory: str) -> int: + total = 0 + for f in glob.glob(os.path.join(directory, "*.jsonl")): + with open(f) as fh: + total += sum(1 for line in fh if line.strip()) + return total + + +def load_dataset_from_dir(directory: str): + """Load all JSONL files in a directory into a HuggingFace dataset.""" + from datasets import load_dataset + files = sorted(glob.glob(os.path.join(directory, "*.jsonl"))) + if not files: + raise FileNotFoundError(f"No JSONL files found in {directory}") + return load_dataset("json", data_files=files, split="train") + + +def run_sft(model, tokenizer, output_dir: str, dry_run: bool = False): + """Run SFT with KL anchor using unsloth + TRL.""" + from trl import SFTTrainer, SFTConfig + from unsloth.chat_templates import get_chat_template + + tokenizer = get_chat_template(tokenizer, chat_template="qwen-3") + + dataset = load_dataset_from_dir(CLEAN_DIR) + print(f"SFT dataset: {len(dataset)} examples") + + if dry_run: + print("[DRY RUN] Would train SFT on", len(dataset), "examples") + return + + def format_chat(example): + convs = example["conversations"] + text = tokenizer.apply_chat_template( + convs, tokenize=False, add_generation_prompt=False + ) + return {"text": text} + + dataset = dataset.map(format_chat, remove_columns=dataset.column_names) + + trainer = SFTTrainer( + model=model, + tokenizer=tokenizer, + train_dataset=dataset, + args=SFTConfig( + output_dir=output_dir, + learning_rate=SFT_CONFIG["learning_rate"], + num_train_epochs=SFT_CONFIG["num_train_epochs"], + per_device_train_batch_size=SFT_CONFIG["per_device_train_batch_size"], + gradient_accumulation_steps=SFT_CONFIG["gradient_accumulation_steps"], + max_seq_length=SFT_CONFIG["max_seq_length"], + warmup_ratio=SFT_CONFIG["warmup_ratio"], + lr_scheduler_type=SFT_CONFIG["lr_scheduler_type"], + fp16=SFT_CONFIG["fp16"], + bf16=SFT_CONFIG["bf16"], + dataset_text_field="text", + save_strategy="epoch", + logging_steps=10, + ), + ) + trainer.train() + model.save_pretrained(output_dir) + tokenizer.save_pretrained(output_dir) + print(f"SFT adapter saved → {output_dir}") + + +def run_dpo(model, tokenizer, output_dir: str, dry_run: bool = False): + """Run DPO preference tuning.""" + from trl import DPOTrainer, DPOConfig + + dataset = load_dataset_from_dir(DPO_DIR) + print(f"DPO dataset: {len(dataset)} pairs") + + if dry_run: + print("[DRY RUN] Would train DPO on", len(dataset), "pairs") + return + + trainer = DPOTrainer( + model=model, + ref_model=None, # unsloth handles reference model internally + tokenizer=tokenizer, + train_dataset=dataset, + args=DPOConfig( + output_dir=output_dir + "-dpo", + beta=DPO_CONFIG["beta"], + learning_rate=DPO_CONFIG["learning_rate"], + num_train_epochs=DPO_CONFIG["num_train_epochs"], + per_device_train_batch_size=DPO_CONFIG["per_device_train_batch_size"], + gradient_accumulation_steps=DPO_CONFIG["gradient_accumulation_steps"], + max_length=DPO_CONFIG["max_length"], + fp16=True, + save_strategy="epoch", + logging_steps=5, + ), + ) + trainer.train() + model.save_pretrained(output_dir + "-dpo") + print(f"DPO adapter saved → {output_dir}-dpo") + + +def main(): + parser = argparse.ArgumentParser(description="Grace LoRA fine-tuning") + parser.add_argument("--dpo", action="store_true", help="Also run DPO after SFT") + parser.add_argument("--dry-run", action="store_true", help="Check data counts, don't train") + args = parser.parse_args() + + # ── Pre-flight checks ── + sft_count = count_examples(CLEAN_DIR) + dpo_count = count_examples(DPO_DIR) + print(f"Training data: {sft_count} SFT examples, {dpo_count} DPO pairs") + + if sft_count < MIN_EXAMPLES_SFT: + print(f"⚠️ Not enough SFT data yet ({sft_count}/{MIN_EXAMPLES_SFT} minimum).") + print(" Keep having conversations with Grace — the exporter runs nightly.") + if not args.dry_run: + return + + if args.dpo and dpo_count < MIN_EXAMPLES_DPO: + print(f"⚠️ Not enough DPO pairs yet ({dpo_count}/{MIN_EXAMPLES_DPO} minimum). Skipping DPO.") + args.dpo = False + + if args.dry_run: + print("\n[DRY RUN] Pre-flight check complete. Run without --dry-run to train.") + return + + # ── Load model ── + from unsloth import FastLanguageModel + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + output_dir = os.path.join(OUTPUT_BASE, f"grace-lora-{timestamp}") + os.makedirs(output_dir, exist_ok=True) + + print("\nLoading Qwen3-8B with unsloth (4-bit, GPU 1)...") + model, tokenizer = FastLanguageModel.from_pretrained( + model_name="Qwen/Qwen3-8B-Instruct", + max_seq_length=SFT_CONFIG["max_seq_length"], + dtype=None, # auto-detect + load_in_4bit=True, # QLoRA — fits in 8GB VRAM + ) + + model = FastLanguageModel.get_peft_model( + model, + **LORA_CONFIG, + bias="none", + use_gradient_checkpointing="unsloth", + ) + + # ── SFT ── + print("\n=== Stage 1: SFT with KL anchor ===") + run_sft(model, tokenizer, output_dir, dry_run=args.dry_run) + + # ── DPO (optional) ── + if args.dpo: + print("\n=== Stage 2: DPO preference tuning ===") + run_dpo(model, tokenizer, output_dir, dry_run=args.dry_run) + + # Output is already on NFS — no copy needed + print(f"\nAdapter saved to NFS: {output_dir}") + + # ── Save run metadata ── + meta = { + "timestamp": timestamp, + "output_dir": output_dir, + "sft_examples": sft_count, + "dpo_pairs": dpo_count if args.dpo else 0, + "lora_config": LORA_CONFIG, + "sft_config": SFT_CONFIG, + "dpo_config": DPO_CONFIG if args.dpo else None, + } + with open(os.path.join(output_dir, "run-meta.json"), "w") as f: + json.dump(meta, f, indent=2) + + print(f"\n✅ Done. Adapter at: {output_dir}") + print("To use: load the adapter with llama.cpp --lora or swap into docker-compose") + + +if __name__ == "__main__": + main() diff --git a/infra-audit.py b/infra-audit.py new file mode 100755 index 0000000..4a0bd5d --- /dev/null +++ b/infra-audit.py @@ -0,0 +1,495 @@ +#!/usr/bin/env python3 +""" +Grace Homelab Infrastructure Audit +Queries Proxmox, TrueNAS, OPNsense, and each VM/LXC for full system state. +Writes everything to MongoDB homelab_infra on DB VM (192.168.20.87). + +Usage: + python3 ~/scripts/infra-audit.py # full audit + python3 ~/scripts/infra-audit.py --update proxmox + python3 ~/scripts/infra-audit.py --update truenas + python3 ~/scripts/infra-audit.py --update opnsense + python3 ~/scripts/infra-audit.py --update services + python3 ~/scripts/infra-audit.py --query "what disk backs /mnt/ai-storage" + python3 ~/scripts/infra-audit.py --dump # print full DB as JSON + +GRACE RULE: Before any infrastructure work, run: + python3 ~/scripts/infra-audit.py --query "" +After any infrastructure change, run: + python3 ~/scripts/infra-audit.py --update +""" + +import argparse +import json +import os +import sys +import urllib.request +import urllib.error +import ssl +from datetime import datetime, timezone + +# ── Config ─────────────────────────────────────────────────────────────────── +MONGO_HOST = "192.168.20.87" +MONGO_PORT = 27017 +MONGO_DB = "homelab_infra" + +PROXMOX_HOST = "https://192.168.20.135:8006" +TRUENAS_HOST = "https://truenas.home.local" +OPNSENSE_HOST = "https://router.home.local:8443" + +# Load creds from Infisical at runtime +def get_creds(): + """Pull all needed credentials from Infisical.""" + try: + env_file = os.path.expanduser("~/.infisical-identities/grace-ai.env") + env = {} + with open(env_file) as f: + for line in f: + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, v = line.split('=', 1) + env[k.strip()] = v.strip().strip('"') + + # Get Infisical token + body = json.dumps({ + "clientId": env["INFISICAL_UNIVERSAL_AUTH_CLIENT_ID"], + "clientSecret": env["INFISICAL_UNIVERSAL_AUTH_CLIENT_SECRET"] + }).encode() + req = urllib.request.Request( + "http://infisical.home.local/api/v1/auth/universal-auth/login", + data=body, headers={"Content-Type": "application/json"} + ) + with urllib.request.urlopen(req, timeout=10) as r: + token = json.loads(r.read())["accessToken"] + + def get_secret(name): + req = urllib.request.Request( + f"http://infisical.home.local/api/v3/secrets/raw/{name}" + f"?workspaceId=80f319a5-9a0c-4cd2-911d-9c59fa515929&environment=dev&secretPath=/", + headers={"Authorization": f"Bearer {token}"} + ) + with urllib.request.urlopen(req, timeout=10) as r: + return json.loads(r.read())["secret"]["secretValue"] + + return { + "proxmox_token_id": get_secret("PROXMOX_TOKEN_ID"), + "proxmox_token_secret": get_secret("PROXMOX_TOKEN_SECRET"), + "truenas_api_key": get_secret("TRUENAS_API_KEY"), + "opnsense_api_key": get_secret("OPNSENSE_API_KEY"), + "opnsense_api_secret": get_secret("OPNSENSE_API_SECRET"), + } + except Exception as e: + print(f"Cred fetch error: {e}") + return {} + + +# ── HTTP helpers ────────────────────────────────────────────────────────────── +CTX = ssl.create_default_context() +CTX.check_hostname = False +CTX.verify_mode = ssl.CERT_NONE + +def http_get(url, headers=None, auth=None, timeout=15): + try: + req = urllib.request.Request(url, headers=headers or {}) + if auth: + import base64 + creds = base64.b64encode(f"{auth[0]}:{auth[1]}".encode()).decode() + req.add_header("Authorization", f"Basic {creds}") + with urllib.request.urlopen(req, timeout=timeout, context=CTX) as r: + return json.loads(r.read()) + except Exception as e: + print(f" GET error {url}: {e}") + return None + + +# ── MongoDB helpers ─────────────────────────────────────────────────────────── +_mongo_client = None + +def get_mongo(): + global _mongo_client + if _mongo_client is None: + import pymongo + _mongo_client = pymongo.MongoClient(MONGO_HOST, MONGO_PORT, serverSelectionTimeoutMS=5000) + return _mongo_client[MONGO_DB] + + +def mongo_upsert(collection, doc, key_field="name"): + try: + db = get_mongo() + db[collection].update_one({key_field: doc[key_field]}, {"$set": doc}, upsert=True) + return True + except Exception as e: + print(f" Mongo error: {e}") + return False + + +def mongo_query_all(collection): + try: + db = get_mongo() + return list(db[collection].find({}, {"_id": 0})) + except Exception as e: + print(f" Mongo query error: {e}") + return [] + + +# ── Proxmox audit ───────────────────────────────────────────────────────────── +def audit_proxmox(creds): + print("\n[Proxmox] Auditing...") + token_id = creds.get("proxmox_token_id", "") + token_secret = creds.get("proxmox_token_secret", "") + auth_header = {"Authorization": f"PVEAPIToken={token_id}={token_secret}"} + + # Cluster resources — all nodes, VMs, LXCs in one call + resources = http_get(f"{PROXMOX_HOST}/api2/json/cluster/resources", headers=auth_header) + if not resources: + print(" Failed to reach Proxmox") + return + + nodes = {} + vms = [] + + for r in resources.get("data", []): + rtype = r.get("type") + if rtype == "node": + nodes[r["node"]] = { + "name": r["node"], + "type": "proxmox_node", + "status": r.get("status"), + "cpu": r.get("cpu"), + "maxcpu": r.get("maxcpu"), + "mem": r.get("mem"), + "maxmem": r.get("maxmem"), + "disk": r.get("disk"), + "maxdisk": r.get("maxdisk"), + "uptime": r.get("uptime"), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + elif rtype in ("qemu", "lxc"): + vms.append({ + "name": r.get("name", f"vm-{r.get('vmid')}"), + "type": rtype, + "vmid": r.get("vmid"), + "node": r.get("node"), + "status": r.get("status"), + "cpu": r.get("cpu"), + "maxcpu": r.get("maxcpu"), + "mem": r.get("mem"), + "maxmem": r.get("maxmem"), + "disk": r.get("disk"), + "maxdisk": r.get("maxdisk"), + "uptime": r.get("uptime"), + "template": r.get("template", 0), + "audited_at": datetime.now(timezone.utc).isoformat(), + }) + + # Get node storage details + for node_name in nodes: + storage = http_get(f"{PROXMOX_HOST}/api2/json/nodes/{node_name}/storage", headers=auth_header) + if storage: + nodes[node_name]["storage"] = storage.get("data", []) + + # Upsert to MongoDB + for node in nodes.values(): + mongo_upsert("nodes", node) + print(f" Node: {node['name']} ({node.get('status')})") + + for vm in vms: + mongo_upsert("vms", vm) + print(f" {vm['type'].upper()}: {vm['name']} (vmid={vm['vmid']}, node={vm['node']}, status={vm['status']})") + + print(f" Saved {len(nodes)} nodes, {len(vms)} VMs/LXCs") + + +# ── TrueNAS audit ───────────────────────────────────────────────────────────── +def audit_truenas(creds): + print("\n[TrueNAS] Auditing...") + api_key = creds.get("truenas_api_key", "") + headers = {"Authorization": f"Bearer {api_key}"} + + # Pools + pools = http_get(f"{TRUENAS_HOST}/api/v2.0/pool", headers=headers) + if pools: + for pool in pools: + doc = { + "name": pool.get("name"), + "type": "truenas_pool", + "status": pool.get("status"), + "size": pool.get("size"), + "free": pool.get("free"), + "allocated": pool.get("allocated"), + "topology": pool.get("topology", {}), + "autotrim": pool.get("autotrim", {}).get("value"), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("storage", doc) + print(f" Pool: {doc['name']} ({doc['status']})") + + # Datasets (NFS exports, mount points) + datasets = http_get(f"{TRUENAS_HOST}/api/v2.0/pool/dataset?limit=50", headers=headers) + if datasets: + for ds in datasets: + name = ds.get("name", "") + doc = { + "name": name, + "type": "truenas_dataset", + "pool": name.split("/")[0] if "/" in name else name, + "mountpoint": ds.get("mountpoint", {}).get("value") if isinstance(ds.get("mountpoint"), dict) else ds.get("mountpoint"), + "used": ds.get("used", {}).get("value") if isinstance(ds.get("used"), dict) else ds.get("used"), + "available": ds.get("available", {}).get("value") if isinstance(ds.get("available"), dict) else ds.get("available"), + "compression": ds.get("compression", {}).get("value") if isinstance(ds.get("compression"), dict) else None, + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("storage", doc) + print(f" {len(datasets)} datasets saved") + + # Disks — critical for knowing SSD vs HDD + disks = http_get(f"{TRUENAS_HOST}/api/v2.0/disk?limit=50", headers=headers) + if disks: + for disk in disks: + rpm = disk.get("rotationrate") + doc = { + "name": disk.get("name"), + "type": "truenas_disk", + "serial": disk.get("serial"), + "model": disk.get("model"), + "size": disk.get("size"), + "rotationrate": rpm, + "disk_type": "SSD" if rpm == 0 else (f"HDD_{rpm}rpm" if rpm else "unknown"), + "description": disk.get("description"), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("storage", doc, key_field="name") + print(f" {len(disks)} disks saved") + + # NFS shares — what's exported where + nfs = http_get(f"{TRUENAS_HOST}/api/v2.0/sharing/nfs", headers=headers) + if nfs: + for share in nfs: + doc = { + "name": share.get("path", "").replace("/", "_"), + "type": "nfs_share", + "path": share.get("path"), + "networks": share.get("networks", []), + "hosts": share.get("hosts", []), + "enabled": share.get("enabled"), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("storage", doc) + print(f" {len(nfs)} NFS shares saved") + + +# ── OPNsense audit ──────────────────────────────────────────────────────────── +def audit_opnsense(creds): + print("\n[OPNsense] Auditing...") + key = creds.get("opnsense_api_key", "") + secret = creds.get("opnsense_api_secret", "") + + # Firmware/version info + firmware = http_get(f"{OPNSENSE_HOST}/api/core/firmware/info", auth=(key, secret)) + if firmware: + doc = { + "name": "opnsense", + "type": "router", + "ip": "router.home.local", + "version": firmware.get("product_version"), + "arch": firmware.get("product_arch"), + "last_check": firmware.get("last_check"), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("services", doc) + print(f" OPNsense {doc.get('version')}") + + # Unbound DNS hosts + dns_hosts = http_get(f"{OPNSENSE_HOST}/api/unbound/host/searchhost", auth=(key, secret)) + if dns_hosts: + entries = dns_hosts.get("rows", []) + doc = { + "name": "unbound_dns_hosts", + "type": "dns_config", + "entries": entries, + "count": len(entries), + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("network", doc) + print(f" {len(entries)} DNS host entries saved") + + # Interfaces + interfaces = http_get(f"{OPNSENSE_HOST}/api/interfaces/overview/interfacesInfo", auth=(key, secret)) + if interfaces: + rows = interfaces.get("rows", []) + doc = { + "name": "opnsense_interfaces", + "type": "network_interfaces", + "interfaces": rows, + "audited_at": datetime.now(timezone.utc).isoformat(), + } + mongo_upsert("network", doc) + print(f" {len(rows)} interfaces saved") + + +# ── Services audit ──────────────────────────────────────────────────────────── +def audit_services(): + """Document known services with their locations, ports, and storage.""" + print("\n[Services] Recording known service map...") + + services = [ + {"name": "assistant-llm", "host": "grace-vm", "ip": "192.168.20.142", "port": 8000, "type": "ai_inference", "container": "docker", "storage": "/home/grace/models", "notes": "Qwen3-8B-Q4_K_M, GPU 0 (GTX 1080 Ti)"}, + {"name": "memory-engine", "host": "grace-vm", "ip": "192.168.20.142", "port": 11434, "type": "ai_embeddings", "container": "docker", "storage": "/opt/ollama-memory/models", "notes": "Ollama CPU-only, nomic-embed-text + phi3:mini"}, + {"name": "qdrant", "host": "grace-vm", "ip": "192.168.20.142", "port": 6333, "type": "vector_db", "container": "docker", "storage": "/mnt/ai-storage/qdrant/storage", "notes": "Vector DB for RAG + mem0. NFS-backed (TrueNAS HDD+SSD cache)"}, + {"name": "open-webui", "host": "grace-vm", "ip": "192.168.20.142", "port": 3000, "type": "ai_ui", "container": "docker", "storage": None}, + {"name": "searxng", "host": "grace-vm", "ip": "192.168.20.142", "port": 8080, "type": "search", "container": "docker", "storage": None}, + {"name": "postgresql", "host": "db-vm", "ip": "192.168.20.87", "port": 5432, "type": "database", "container": "native", "storage": "/srv/databases/postgresql", "notes": "grace_training DB for transcripts"}, + {"name": "mysql", "host": "db-vm", "ip": "192.168.20.87", "port": 3306, "type": "database", "container": "native", "storage": "/var/lib/mysql"}, + {"name": "mongodb", "host": "db-vm", "ip": "192.168.20.87", "port": 27017, "type": "database", "container": "native", "storage": "/srv/databases/mongodb", "notes": "homelab_infra — this audit DB"}, + {"name": "influxdb", "host": "db-vm", "ip": "192.168.20.87", "port": 8086, "type": "timeseries_db", "container": "native", "storage": None}, + {"name": "pgadmin", "host": "db-vm", "ip": "192.168.20.87", "port": 5050, "type": "db_ui", "container": "docker", "storage": None}, + {"name": "proxmox-homelab", "host": "proxmox", "ip": "192.168.20.135", "port": 8006, "type": "hypervisor", "container": "native", "storage": None, "notes": "Primary Proxmox node — homelab"}, + {"name": "proxmox-router", "host": "proxmox", "ip": "192.168.20.2", "port": 8006, "type": "hypervisor", "container": "native", "storage": None, "notes": "Router Proxmox node — hosts Caddy LXC 130"}, + {"name": "caddy", "host": "lxc-130", "ip": "192.168.20.130", "port": 443, "type": "reverse_proxy", "container": "native", "storage": "/etc/caddy/Caddyfile", "notes": "Custom INWX build, on ROUTER node not homelab node"}, + {"name": "gitea", "host": "lxc-115", "ip": "192.168.20.132", "port": 3000, "type": "git", "container": "native", "storage": None, "notes": "Port 3000 not 80. Use 192.168.20.132:3000 for remotes"}, + {"name": "matrix-synapse", "host": "lxc-126", "ip": "192.168.20.127", "port": 8008, "type": "chat", "container": "native", "storage": None, "notes": "Proxied via Caddy as matrix.maxwellburton.com"}, + {"name": "element", "host": "lxc-126", "ip": "192.168.20.127", "port": 8009, "type": "chat_ui", "container": "native", "storage": None, "notes": "Proxied via Caddy as chat.maxwellburton.com"}, + {"name": "nextcloud", "host": "vm", "ip": "192.168.20.125", "port": 11000, "type": "files", "container": "native", "storage": None, "notes": "Proxied via Caddy as drive.maxwellburton.com"}, + {"name": "grafana", "host": "lxc-120", "ip": "192.168.20.120", "port": 3000, "type": "monitoring", "container": "native", "storage": None}, + {"name": "prometheus", "host": "lxc-123", "ip": None, "port": 9090, "type": "monitoring", "container": "native", "storage": None}, + {"name": "opnsense", "host": "router", "ip": "router.home.local", "port": 8443, "type": "firewall", "container": "native", "storage": None, "notes": "API at router.home.local:8443 NOT 192.168.20.1 (HAProxy)"}, + {"name": "truenas", "host": "vm-102", "ip": "192.168.20.228", "port": 443, "type": "nas", "container": "native", "storage": None, "notes": "Tank pool: 4x HDD mirrors + SSD L2ARC cache"}, + {"name": "infisical", "host": "lxc", "ip": "infisical.home.local", "port": 80, "type": "secrets", "container": "native", "storage": None}, + {"name": "joplin-server", "host": "lxc-111", "ip": "192.168.20.x", "port": 22300, "type": "notes", "container": "native", "storage": None}, + {"name": "n8n", "host": "lxc-105", "ip": None, "port": 5678, "type": "automation", "container": "native", "storage": None}, + ] + + for svc in services: + svc["audited_at"] = datetime.now(timezone.utc).isoformat() + mongo_upsert("services", svc) + + print(f" {len(services)} services documented") + + +# ── Storage topology ────────────────────────────────────────────────────────── +def audit_storage_topology(): + """Document the full storage topology — what backs what.""" + print("\n[Storage Topology] Recording...") + + topology = [ + { + "name": "grace-vm-root", + "type": "storage_mapping", + "vm": "grace-vm (192.168.20.142)", + "mount": "/", + "device": "/dev/sda (180GB)", + "backed_by": "local-lvm on tower Proxmox node", + "medium": "SSD", + "use": "OS only", + }, + { + "name": "grace-vm-ai-storage", + "type": "storage_mapping", + "vm": "grace-vm (192.168.20.142)", + "mount": "/mnt/ai-storage", + "device": "/dev/sdb (1TB)", + "backed_by": "TrueNAS Tank pool dataset vm-ai-storage, NFS via Proxmox passthrough", + "medium": "HDD with SSD L2ARC cache (NOT pure SSD)", + "use": "AI models, training data, Qdrant vectors, ChromaDB", + "notes": "Tank pool is HDD mirrors + SSD cache. Do NOT run latency-sensitive DBs here if possible.", + }, + { + "name": "grace-vm-shared-documents", + "type": "storage_mapping", + "vm": "grace-vm (192.168.20.142)", + "mount": "/mnt/shared_documents", + "device": "NFS 192.168.20.228:/mnt/Tank/systems/shared_documents", + "backed_by": "TrueNAS Tank pool, HDD with SSD L2ARC cache", + "medium": "HDD with SSD L2ARC cache", + "use": ".md files, docs, memory files ONLY. Not for apps or DBs.", + }, + { + "name": "db-vm-databases", + "type": "storage_mapping", + "vm": "db-vm (192.168.20.87)", + "mount": "/srv/databases", + "device": "/dev/sdb (1TB)", + "backed_by": "Proxmox local-lvm or TrueNAS dataset vm-db-server-dev", + "medium": "unknown — needs verification", + "use": "PostgreSQL, MongoDB, InfluxDB data", + }, + { + "name": "truenas-tank-pool", + "type": "storage_hardware", + "pool": "Tank", + "topology": "2x MIRROR vdevs (4 HDDs total)", + "cache": "SSD L2ARC (read cache)", + "medium": "HDD primary + SSD cache", + "notes": "NFS exports: vm-ai-storage, shared_documents, backup-drive, immich_storage, joplinapp-storage", + }, + ] + + for doc in topology: + mongo_upsert("storage", doc) + print(f" {len(topology)} storage mappings documented") + + +# ── Main ────────────────────────────────────────────────────────────────────── +def main(): + parser = argparse.ArgumentParser(description="Grace Homelab Infrastructure Audit") + parser.add_argument("--update", type=str, help="Update specific component: proxmox|truenas|opnsense|services|storage") + parser.add_argument("--dump", action="store_true", help="Dump all collections as JSON") + parser.add_argument("--query", type=str, help="Natural language summary query (prints relevant docs)") + args = parser.parse_args() + + if args.dump: + for collection in ["nodes", "vms", "services", "storage", "network"]: + docs = mongo_query_all(collection) + print(f"\n=== {collection.upper()} ({len(docs)} docs) ===") + print(json.dumps(docs, indent=2, default=str)[:3000]) + return + + if args.query: + # Simple keyword search across all collections + import subprocess + query_lower = args.query.lower() + keywords = [w for w in query_lower.split() if len(w) > 3] + for collection in ["nodes", "vms", "services", "storage", "network"]: + docs = mongo_query_all(collection) + for doc in docs: + doc_str = json.dumps(doc, default=str).lower() + if any(kw in doc_str for kw in keywords): + print(f"\n[{collection}] {doc.get('name','?')}") + # Print key fields + for k in ["type","ip","port","mount","medium","backed_by","notes","status","use"]: + if k in doc and doc[k]: + print(f" {k}: {doc[k]}") + return + + creds = get_creds() + + if args.update: + component = args.update.lower() + if component == "proxmox": + audit_proxmox(creds) + elif component == "truenas": + audit_truenas(creds) + elif component == "opnsense": + audit_opnsense(creds) + elif component == "services": + audit_services() + elif component == "storage": + audit_storage_topology() + else: + print(f"Unknown component: {component}") + return + + # Full audit + print("=== Grace Homelab Full Infrastructure Audit ===") + print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + audit_proxmox(creds) + audit_truenas(creds) + audit_opnsense(creds) + audit_services() + audit_storage_topology() + print(f"\n=== Audit complete ===") + print("Query with: python3 ~/scripts/infra-audit.py --query ''") + + +if __name__ == "__main__": + main() diff --git a/infra-gitea-link.py b/infra-gitea-link.py new file mode 100755 index 0000000..1855bd0 --- /dev/null +++ b/infra-gitea-link.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Gitea → MongoDB infrastructure linker. + +For each VM/LXC/service in homelab_infra MongoDB, finds matching Gitea issues +and recent commits, attaches them as gitea_links. + +Matching strategy: + - Extracts keywords from component name (strips common words) + - Searches issue titles and commit messages for those keywords + - Requires strong keyword overlap to avoid false positives + - Repos searched: infra/grace, infra/homelab (primary infra repos only) + - Also checks Grace/homelab-ai-agent and projects/* for service-level matches + +Runs nightly. Safe to re-run — always overwrites gitea_links field. + +Usage: + python3 ~/scripts/infra-gitea-link.py + python3 ~/scripts/infra-gitea-link.py --component caddy + python3 ~/scripts/infra-gitea-link.py --dry-run +""" + +import argparse +import json +import os +import re +import urllib.request +from datetime import datetime, timezone + +GITEA_BASE = "http://192.168.20.132:3000/api/v1" +GITEA_TOKEN = "dc5381bb236a820c2278da5199f379acda4bca93" +MONGO_HOST = "192.168.20.87" +MONGO_DB = "homelab_infra" + +# Repos to search — ordered by relevance +INFRA_REPOS = ["infra/homelab", "infra/grace"] +SERVICE_REPOS = ["Grace/homelab-ai-agent", "projects/homelab-dashy"] +ALL_REPOS = INFRA_REPOS + SERVICE_REPOS + +# Words too generic to use as match keywords +STOPWORDS = { + "vm", "lxc", "server", "node", "local", "home", "main", "new", "old", + "the", "and", "for", "with", "this", "that", "from", "into", "onto", + "setup", "install", "config", "update", "add", "fix", "test", "run", + "docker", "container", "service", "system", "data", "base", "storage", + "port", "host", "ip", "api", "http", "https", "move", "migrate", + "enable", "disable", "check", "get", "set", "use", "via", +} + +# Component → repo affinity: which repos are most relevant per component type +REPO_AFFINITY = { + "ai_inference": ["infra/grace", "Grace/homelab-ai-agent"], + "ai_embeddings": ["infra/grace", "Grace/homelab-ai-agent"], + "vector_db": ["infra/grace"], + "reverse_proxy": ["infra/homelab"], + "hypervisor": ["infra/homelab"], + "nas": ["infra/homelab"], + "firewall": ["infra/homelab"], + "chat": ["infra/homelab"], + "git": ["infra/homelab"], + "monitoring": ["infra/homelab"], + "database": ["infra/grace", "infra/homelab"], + "proxmox_node": ["infra/homelab"], + "qemu": ["infra/homelab"], + "lxc": ["infra/homelab"], +} + + +# ── Gitea API helpers ───────────────────────────────────────────────────────── + +def gitea_get(path, params=None): + url = f"{GITEA_BASE}/{path}" + if params: + url += "?" + "&".join(f"{k}={v}" for k, v in params.items()) + req = urllib.request.Request(url, headers={"Authorization": f"token {GITEA_TOKEN}"}) + try: + with urllib.request.urlopen(req, timeout=10) as r: + return json.loads(r.read()) + except Exception as e: + return None + + +def get_all_issues(repo, state="open"): + """Get all issues from a repo (paginated).""" + issues = [] + page = 1 + while True: + batch = gitea_get(f"repos/{repo}/issues", + {"type": "issues", "state": state, "limit": 50, "page": page}) + if not batch: + break + issues.extend(batch) + if len(batch) < 50: + break + page += 1 + return issues + + +def get_recent_commits(repo, limit=20): + """Get recent commits from default branch.""" + commits = gitea_get(f"repos/{repo}/commits", {"limit": limit, "page": 1}) + return commits or [] + + +# ── Keyword extraction ──────────────────────────────────────────────────────── + +def extract_keywords(component: dict) -> set[str]: + """ + Extract meaningful match keywords from a component document. + Pulls from: name, type, notes, host fields. + """ + raw_text = " ".join([ + component.get("name", ""), + component.get("type", ""), + component.get("notes", ""), + component.get("host", ""), + ]).lower() + + # Split on non-alpha and filter stopwords + short tokens + tokens = re.split(r'[^a-z0-9]+', raw_text) + keywords = {t for t in tokens if len(t) >= 4 and t not in STOPWORDS} + + # Special: expand known aliases + aliases = { + "proxmox": {"pve", "proxmox"}, + "caddy": {"caddy", "reverse", "proxy", "caddyfile"}, + "matrix": {"matrix", "synapse", "element"}, + "nextcloud": {"nextcloud", "drive"}, + "gitea": {"gitea", "git"}, + "truenas": {"truenas", "tank", "zfs", "pool"}, + "opnsense":{"opnsense", "firewall", "router", "unbound"}, + "grafana": {"grafana", "dashboard"}, + "qdrant": {"qdrant", "vector"}, + "ollama": {"ollama", "llama"}, + "qwen": {"qwen", "llm", "inference"}, + "mongodb": {"mongodb", "mongo"}, + } + for key, alias_set in aliases.items(): + if key in keywords: + keywords |= alias_set + + return keywords + + +def score_match(text: str, keywords: set[str]) -> int: + """Score how many keywords appear in text. Returns match count.""" + text_lower = text.lower() + return sum(1 for kw in keywords if kw in text_lower) + + +def find_matches(component: dict, all_issues: dict, all_commits: dict) -> dict: + """ + Find matching issues and commits for a component. + Returns dict with matched issues and commits. + """ + keywords = extract_keywords(component) + if not keywords: + return {} + + comp_type = component.get("type", "") + comp_name = component.get("name", "") + + # Determine which repos to search based on type affinity + preferred_repos = REPO_AFFINITY.get(comp_type, ALL_REPOS) + + matched_issues = [] + matched_commits = [] + + # Minimum score threshold — require at least 2 keyword hits + # UNLESS the component name itself is a strong unique identifier (>=6 chars) + name_is_unique = len(comp_name) >= 6 and comp_name not in STOPWORDS + min_score = 1 if name_is_unique else 2 + + for repo in preferred_repos: + issues = all_issues.get(repo, []) + commits = all_commits.get(repo, []) + + for issue in issues: + title = issue.get("title", "") + body = issue.get("body", "") or "" + score = score_match(title, keywords) + score_match(body[:500], keywords) * 0.5 + if score >= min_score: + matched_issues.append({ + "repo": repo, + "number": issue["number"], + "title": title, + "state": issue.get("state"), + "url": issue.get("html_url"), + "score": score, + "updated_at": issue.get("updated_at"), + }) + + for commit in commits: + msg = commit.get("commit", {}).get("message", "").split("\n")[0] + if score_match(msg, keywords) >= min_score: + matched_commits.append({ + "repo": repo, + "sha": commit.get("sha", "")[:8], + "message": msg, + "url": commit.get("html_url"), + "date": commit.get("commit", {}).get("author", {}).get("date"), + }) + + # Sort by score desc, take top 3 of each + matched_issues = sorted(matched_issues, key=lambda x: -x["score"])[:3] + matched_commits = sorted(matched_commits, key=lambda x: x.get("date",""), reverse=True)[:3] + + # Strip internal score field before storing + for i in matched_issues: + i.pop("score", None) + + return { + "issues": matched_issues, + "commits": matched_commits, + "keywords_used": list(keywords)[:10], + "linked_at": datetime.now(timezone.utc).isoformat(), + } + + +# ── MongoDB helpers ─────────────────────────────────────────────────────────── +_mongo_client = None + +def get_mongo(): + global _mongo_client + if _mongo_client is None: + import pymongo + _mongo_client = pymongo.MongoClient(MONGO_HOST, 27017, serverSelectionTimeoutMS=5000) + return _mongo_client[MONGO_DB] + + +def mongo_query_all(collection): + try: + return list(get_mongo()[collection].find({}, {"_id": 0})) + except Exception as e: + print(f" Mongo query error: {e}") + return [] + + +def mongo_update_gitea_links(collection, name_value, links): + try: + get_mongo()[collection].update_one({"name": name_value}, {"$set": {"gitea_links": links}}) + return True + except Exception as e: + print(f" Mongo update error: {e}") + return False + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Link Gitea issues to homelab_infra MongoDB") + parser.add_argument("--component", type=str, help="Only process this component name") + parser.add_argument("--dry-run", action="store_true", help="Show matches without writing to DB") + args = parser.parse_args() + + print("=== Gitea → MongoDB infrastructure linker ===") + print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + # Pre-fetch all issues and commits from all repos (batch to minimize API calls) + print("\nFetching Gitea data...") + all_issues = {} + all_commits = {} + + for repo in ALL_REPOS: + open_issues = get_all_issues(repo, state="open") + closed_issues = get_all_issues(repo, state="closed") + all_issues[repo] = open_issues + closed_issues + all_commits[repo] = get_recent_commits(repo, limit=30) + print(f" {repo}: {len(all_issues[repo])} issues, {len(all_commits[repo])} commits") + + # Process each collection + total_linked = 0 + collections = ["services", "vms", "nodes", "storage"] + + for collection in collections: + docs = mongo_query_all(collection) + if not docs: + continue + + print(f"\n[{collection}] {len(docs)} components...") + + for doc in docs: + name = doc.get("name", "") + if not name: + continue + if args.component and args.component.lower() not in name.lower(): + continue + + links = find_matches(doc, all_issues, all_commits) + + issue_count = len(links.get("issues", [])) + commit_count = len(links.get("commits", [])) + + if issue_count or commit_count: + print(f" {name}: {issue_count} issues, {commit_count} commits") + if args.dry_run: + for i in links.get("issues", []): + print(f" ISSUE #{i['number']}: {i['title']} [{i['state']}]") + for c in links.get("commits", []): + print(f" COMMIT {c['sha']}: {c['message'][:60]}") + else: + mongo_update_gitea_links(collection, name, links) + total_linked += 1 + else: + # Still write empty links to clear stale data + if not args.dry_run: + mongo_update_gitea_links(collection, name, { + "issues": [], "commits": [], + "keywords_used": list(extract_keywords(doc))[:10], + "linked_at": datetime.now(timezone.utc).isoformat(), + }) + + if not args.dry_run: + print(f"\n{total_linked} components linked to Gitea data") + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/install-unsloth.sh b/install-unsloth.sh new file mode 100755 index 0000000..027eb2c --- /dev/null +++ b/install-unsloth.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# Install unsloth for Qwen3-8B LoRA fine-tuning on GTX 1080 (Pascal, CUDA 12.x) +# Uses GPU 1 (GTX 1080, 8GB VRAM) — leaves GPU 0 free for inference +# Run once. Safe to re-run. + +set -e + +echo "=== Unsloth install for Grace fine-tuning (GPU 1, GTX 1080) ===" +echo "CUDA: $(nvcc --version 2>/dev/null | grep release || echo 'checking docker...')" +echo "" + +# Use a dedicated venv to avoid polluting system Python +VENV_DIR="$HOME/unsloth-env" + +if [ ! -d "$VENV_DIR" ]; then + echo "[1/5] Creating venv at $VENV_DIR..." + python3 -m venv "$VENV_DIR" +else + echo "[1/5] Venv already exists at $VENV_DIR" +fi + +source "$VENV_DIR/bin/activate" + +echo "[2/5] Installing PyTorch with CUDA 12.1 (Pascal-compatible)..." +pip install --quiet torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 + +echo "[3/5] Installing unsloth (Qwen3 + LoRA support)..." +pip install --quiet "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git" + +echo "[4/5] Installing training utilities..." +pip install --quiet \ + transformers \ + datasets \ + trl \ + peft \ + accelerate \ + bitsandbytes \ + wandb \ + scipy + +echo "[5/5] Verifying install..." +python3 -c " +import torch +print(f'PyTorch: {torch.__version__}') +print(f'CUDA available: {torch.cuda.is_available()}') +if torch.cuda.is_available(): + print(f'GPU 0: {torch.cuda.get_device_name(0)}') + print(f'GPU 1: {torch.cuda.get_device_name(1)}') +import unsloth +print(f'Unsloth: OK') +" + +echo "" +echo "=== Install complete ===" +echo "Activate with: source $VENV_DIR/bin/activate" +echo "Run fine-tuning with: ~/scripts/finetune-lora.py" diff --git a/load-transcripts-to-db.py b/load-transcripts-to-db.py new file mode 100644 index 0000000..6fe6f31 --- /dev/null +++ b/load-transcripts-to-db.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +""" +Load Grace transcript JSONL files into PostgreSQL on DB VM (192.168.20.87). + +Tables: + sessions — one row per conversation session + turns — one row per turn (system/human/gpt) + session_tags — optional labels + +Run nightly after export-transcripts.py. +""" + +import json +import os +import glob +import psycopg2 +from datetime import datetime, timezone + +TRANSCRIPTS_DIR = "/mnt/ai-storage/grace/training-data/transcripts" +STATE_FILE = os.path.expanduser("~/self-improving/db-load-state.json") + +DB_CONFIG = { + "host": "192.168.20.87", + "port": 5432, + "dbname": "grace_training", + "user": "grace_ai", + "password": "grace_training_2026", +} + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"loaded_files": [], "total_sessions": 0, "total_turns": 0} + + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def load_file(conn, path: str) -> tuple[int, int]: + """Load one transcript JSONL file. Returns (sessions_added, turns_added).""" + sessions_added = 0 + turns_added = 0 + + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + example = json.loads(line) + except json.JSONDecodeError: + continue + + source_file = example.get("source", os.path.basename(path)) + session_id = source_file.replace(".jsonl", "") + turn_count = example.get("turn_count", 0) + exported_at = example.get("exported_at", datetime.now(timezone.utc).isoformat()) + conversations = example.get("conversations", []) + + with conn.cursor() as cur: + # Upsert session (skip if already loaded) + cur.execute(""" + INSERT INTO sessions (session_id, source_file, turn_count, exported_at) + VALUES (%s, %s, %s, %s) + ON CONFLICT (session_id) DO NOTHING + RETURNING id + """, (session_id, source_file, turn_count, exported_at)) + + row = cur.fetchone() + if not row: + # Already exists, skip + continue + + sessions_added += 1 + + # Insert all turns + for i, turn in enumerate(conversations): + role = turn.get("from", "") + content = turn.get("value", "") + if not content: + continue + cur.execute(""" + INSERT INTO turns (session_id, turn_index, role, content) + VALUES (%s, %s, %s, %s) + """, (session_id, i, role, content)) + turns_added += 1 + + conn.commit() + + return sessions_added, turns_added + + +def main(): + state = load_state() + + transcript_files = sorted(glob.glob(os.path.join(TRANSCRIPTS_DIR, "*.jsonl"))) + new_files = [f for f in transcript_files if os.path.basename(f) not in state["loaded_files"]] + + if not new_files: + print("No new transcript files to load.") + return + + try: + conn = psycopg2.connect(**DB_CONFIG) + except Exception as e: + print(f"DB connection failed: {e}") + return + + total_sessions = 0 + total_turns = 0 + + for f in new_files: + print(f"Loading: {os.path.basename(f)}") + try: + s, t = load_file(conn, f) + print(f" → {s} sessions, {t} turns") + total_sessions += s + total_turns += t + state["loaded_files"].append(os.path.basename(f)) + except Exception as e: + print(f" Error: {e}") + conn.rollback() + + conn.close() + + state["total_sessions"] = state.get("total_sessions", 0) + total_sessions + state["total_turns"] = state.get("total_turns", 0) + total_turns + save_state(state) + + print(f"\nLoaded {total_sessions} sessions, {total_turns} turns") + print(f"DB totals: {state['total_sessions']} sessions, {state['total_turns']} turns") + + +if __name__ == "__main__": + main() diff --git a/overnight-qwen3.py b/overnight-qwen3.py new file mode 100755 index 0000000..59fb65c --- /dev/null +++ b/overnight-qwen3.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Overnight autonomous research agent using local qwen3:1.7b via Ollama. +Runs nightly at 1 AM PT when Maxwell is not active. +Writes research output to ~/self-improving/domains/ and Joplin notes. + +Model: qwen3:1.7b on memory-engine container (127.0.0.1:11434) +""" + +import json +import os +import subprocess +import urllib.request +import urllib.error +from datetime import datetime, timezone + +OLLAMA_URL = "http://127.0.0.1:11434" +MODEL = "qwen3:1.7b" +DOMAINS_DIR = os.path.expanduser("~/self-improving/domains") +LOG_FILE = os.path.expanduser(f"~/self-improving/overnight-qwen3-{datetime.now().strftime('%Y%m%d')}.log") + +TASK_LIST = [ + # (filename, research prompt) + ("gitea.md", "Document the Gitea REST API and `tea` CLI: key endpoints for issues, PRs, repos, labels. Include curl examples and common tea commands. Focus on what a homelab automation agent would use daily."), + ("influxdb.md", "Document InfluxDB 2.x: Flux query language basics, writing data via API, key concepts (bucket, org, token). Include curl examples for write and query endpoints."), + ("grafana.md", "Document Grafana HTTP API: dashboard CRUD, datasource management, annotations, alerting. Include curl examples. Focus on automation use cases."), + ("prometheus.md", "Document Prometheus HTTP API: instant queries, range queries, targets, rules, alerts endpoints. Include curl examples for common monitoring queries."), + ("alertmanager.md", "Document Alertmanager API and config: silence management, alert routing, receiver config (Telegram webhook). Include config YAML examples."), + ("home-assistant.md", "Document Home Assistant REST API and websocket API: entity states, services, automations. Include curl examples for common operations."), + ("n8n.md", "Document n8n workflow automation: REST API, webhook triggers, key nodes. Focus on homelab automation use cases."), + ("ollama.md", "Document Ollama REST API: model management, generate, chat, embeddings endpoints. Include curl examples. Note: also covers memory-engine container at port 11434."), + ("joplin-server.md", "Document Joplin Server and Joplin REST API (port 41184): notes CRUD, search, folders, tags. Include curl examples."), + ("dashy.md", "Document Dashy configuration: config.yml structure, adding services, sections, icons. Focus on programmatic config management."), + ("traefik.md", "Document Traefik v3: dynamic config, routers, services, middlewares, API dashboard. Include examples for homelab reverse proxy setup."), + ("llama-cpp-server.md", "Document llama-server (llama.cpp) HTTP API: /completion, /chat/completions, /health, /props, /metrics endpoints. Include curl examples and key startup flags."), + ("litellm.md", "Document LiteLLM proxy: config.yaml structure, model routing, OpenAI-compatible API. Focus on routing to local models."), +] + + +def log(msg): + ts = datetime.now().strftime("%H:%M:%S") + line = f"[{ts}] {msg}" + print(line) + with open(LOG_FILE, "a") as f: + f.write(line + "\n") + + +def ollama_generate(prompt, max_tokens=2000): + """Call local Ollama qwen3:1.7b with /think disabled for factual output.""" + payload = json.dumps({ + "model": MODEL, + "prompt": prompt, + "stream": False, + "think": False, + "options": { + "num_predict": max_tokens, + "temperature": 0.3, + "top_p": 0.9, + } + }).encode() + + req = urllib.request.Request( + f"{OLLAMA_URL}/api/generate", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + try: + with urllib.request.urlopen(req, timeout=300) as resp: + result = json.load(resp) + return result.get("response", "").strip() + except Exception as e: + return f"ERROR: {e}" + + +def domain_exists_and_fresh(filename): + """Skip if domain doc exists and was written recently (last 7 days).""" + path = os.path.join(DOMAINS_DIR, filename) + if not os.path.exists(path): + return False + age_days = (datetime.now().timestamp() - os.path.getmtime(path)) / 86400 + return age_days < 7 + + +def write_domain_doc(filename, content): + os.makedirs(DOMAINS_DIR, exist_ok=True) + path = os.path.join(DOMAINS_DIR, filename) + header = f"# {filename.replace('.md','').replace('-',' ').title()}\n\n" + header += f"*Generated by overnight-qwen3 agent — {datetime.now().strftime('%Y-%m-%d')}*\n\n" + with open(path, "w") as f: + f.write(header + content) + log(f" Wrote {path} ({len(content)} chars)") + + +def main(): + log(f"=== Overnight Qwen3 research agent starting ===") + log(f"Model: {MODEL} @ {OLLAMA_URL}") + + # Verify Ollama is up + try: + urllib.request.urlopen(f"{OLLAMA_URL}/api/tags", timeout=5) + log("Ollama reachable ✓") + except Exception as e: + log(f"Ollama not reachable: {e} — aborting") + return + + completed = 0 + skipped = 0 + errors = 0 + + for filename, prompt in TASK_LIST: + if domain_exists_and_fresh(filename): + log(f"SKIP: {filename} (exists, < 7 days old)") + skipped += 1 + continue + + log(f"Researching: {filename}") + result = ollama_generate(prompt) + + if result.startswith("ERROR:"): + log(f" FAILED: {result}") + errors += 1 + continue + + write_domain_doc(filename, result) + completed += 1 + + log(f"=== Done: {completed} written, {skipped} skipped, {errors} errors ===") + + # Update heartbeat state + state_path = os.path.expanduser("~/self-improving/heartbeat-state.md") + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + if os.path.exists(state_path): + content = open(state_path).read() + import re + if "last_qwen3_run" in content: + content = re.sub(r"last_qwen3_run:.*", f"last_qwen3_run: {ts}", content) + else: + content += f"\nlast_qwen3_run: {ts}\n" + open(state_path, "w").write(content) + + +if __name__ == "__main__": + main() diff --git a/overnight-research.sh b/overnight-research.sh new file mode 100755 index 0000000..e026ca6 --- /dev/null +++ b/overnight-research.sh @@ -0,0 +1,64 @@ +#!/bin/bash +# Overnight research + self-improvement pass +# Runs nightly at 2 AM PT when Maxwell is not active +# Exports training data to DB VM, compacts memory, refreshes index + +LOG="$HOME/self-improving/overnight-$(date +%Y%m%d).log" +echo "=== Overnight run: $(date) ===" >> "$LOG" + +# 1. Export new training data to DB VM +echo "[1/4] Exporting training data..." >> "$LOG" +python3 ~/scripts/export-training-data.py >> "$LOG" 2>&1 +python3 ~/scripts/convert-training-data.py >> "$LOG" 2>&1 +python3 ~/scripts/export-transcripts.py >> "$LOG" 2>&1 +python3 ~/scripts/load-transcripts-to-db.py >> "$LOG" 2>&1 + +# 2. Check self-improving file sizes +echo "[2/4] Checking memory file sizes..." >> "$LOG" +for f in ~/self-improving/corrections.md ~/self-improving/memory.md; do + lines=$(wc -l < "$f" 2>/dev/null || echo 0) + if [ "$lines" -gt 100 ]; then + echo " WARNING: $f has $lines lines — needs compaction" >> "$LOG" + else + echo " OK: $f ($lines lines)" >> "$LOG" + fi +done + +# 3. Refresh index.md counts +echo "[3/4] Refreshing index..." >> "$LOG" +python3 - << 'PYEOF' >> "$LOG" 2>&1 +import os, glob +base = os.path.expanduser("~/self-improving") +lines = ["# Self-Improving Index", f"Updated: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M')}", ""] +for f in ["memory.md", "corrections.md"]: + path = os.path.join(base, f) + count = len(open(path).readlines()) if os.path.exists(path) else 0 + lines.append(f"- {f}: {count} lines") +lines.append(f"- domains/: {len(glob.glob(base+'/domains/*.md'))} files") +lines.append(f"- projects/: {len(glob.glob(base+'/projects/*.md'))} files") +lines.append(f"- archive/: {len(glob.glob(base+'/archive/*.md'))} files") +with open(os.path.join(base, "index.md"), "w") as out: + out.write("\n".join(lines) + "\n") +print(" index.md refreshed") +PYEOF + +# 4. Update heartbeat state timestamp +echo "[4/4] Updating heartbeat state..." >> "$LOG" +python3 - << 'PYEOF' >> "$LOG" 2>&1 +import re, os +from datetime import datetime, timezone +path = os.path.expanduser("~/self-improving/heartbeat-state.md") +ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") +if os.path.exists(path): + content = open(path).read() + if "last_overnight_run" in content: + content = re.sub(r"last_overnight_run:.*", f"last_overnight_run: {ts}", content) + else: + content += f"\nlast_overnight_run: {ts}\n" + open(path, "w").write(content) + print(f" Updated last_overnight_run: {ts}") +else: + print(" heartbeat-state.md not found, skipping") +PYEOF + +echo "=== Done: $(date) ===" >> "$LOG" diff --git a/rag-docs.json b/rag-docs.json new file mode 100644 index 0000000..5923057 --- /dev/null +++ b/rag-docs.json @@ -0,0 +1,111 @@ +{ + "_comment": "RAG document sources for Grace's homelab knowledge base.", + "_directive": "GRACE RULE: When Maxwell sends a URL or webpage and says 'read this', 'remember this', 'learn this', or 'add this to your docs', append it to this file immediately under the appropriate category. Also add any doc page you find yourself fetching more than once in a session.", + "_last_updated": "2026-03-16", + + "sources": [ + { + "name": "OPNsense API", + "urls": [ + "https://docs.opnsense.org/development/api.html", + "https://docs.opnsense.org/development/api/core/unbound.html", + "https://docs.opnsense.org/development/api/core/firmware.html", + "https://docs.opnsense.org/development/api/core/firewall.html", + "https://docs.opnsense.org/manual/nat.html", + "https://docs.opnsense.org/manual/firewall.html" + ], + "category": "networking", + "refresh_days": 30 + }, + { + "name": "Proxmox VE API", + "urls": [ + "https://pve.proxmox.com/pve-docs/api-viewer/", + "https://pve.proxmox.com/wiki/Proxmox_VE_API", + "https://pve.proxmox.com/wiki/Linux_Container", + "https://pve.proxmox.com/wiki/Qemu/KVM_Virtual_Machines" + ], + "category": "infrastructure", + "refresh_days": 30 + }, + { + "name": "Caddy", + "urls": [ + "https://caddyserver.com/docs/caddyfile", + "https://caddyserver.com/docs/caddyfile/directives", + "https://caddyserver.com/docs/caddyfile/directives/reverse_proxy", + "https://caddyserver.com/docs/caddyfile/directives/tls", + "https://caddyserver.com/docs/modules/dns.providers.inwx" + ], + "category": "networking", + "refresh_days": 30 + }, + { + "name": "Docker Compose", + "urls": [ + "https://docs.docker.com/compose/compose-file/", + "https://docs.docker.com/compose/compose-file/05-services/", + "https://docs.docker.com/compose/networking/" + ], + "category": "infrastructure", + "refresh_days": 60 + }, + { + "name": "Unbound DNS", + "urls": [ + "https://docs.opnsense.org/manual/unbound.html", + "https://nlnetlabs.nl/documentation/unbound/unbound.conf/" + ], + "category": "networking", + "refresh_days": 60 + }, + { + "name": "Qdrant", + "urls": [ + "https://qdrant.tech/documentation/quick-start/", + "https://qdrant.tech/documentation/concepts/collections/", + "https://qdrant.tech/documentation/concepts/search/", + "https://qdrant.tech/documentation/concepts/payload/" + ], + "category": "infrastructure", + "refresh_days": 60 + }, + { + "name": "Unsloth / Qwen3 fine-tuning", + "urls": [ + "https://unsloth.ai/docs/models/qwen3-how-to-run-and-fine-tune", + "https://unsloth.ai/docs/get-started/fine-tuning" + ], + "category": "ai", + "refresh_days": 14 + }, + { + "name": "OpenClaw", + "urls": [ + "https://docs.openclaw.ai/", + "https://docs.openclaw.ai/skills", + "https://docs.openclaw.ai/configuration" + ], + "category": "ai", + "refresh_days": 14 + }, + { + "name": "Gitea", + "urls": [ + "https://docs.gitea.com/api/1.20/", + "https://docs.gitea.com/usage/managing-repositories" + ], + "category": "infrastructure", + "refresh_days": 60 + }, + { + "name": "TrueNAS", + "urls": [ + "https://www.truenas.com/docs/scale/api/", + "https://www.truenas.com/docs/scale/scaletutorials/datasets/" + ], + "category": "storage", + "refresh_days": 60 + } + ] +} diff --git a/rag-ingest.py b/rag-ingest.py new file mode 100755 index 0000000..b923a40 --- /dev/null +++ b/rag-ingest.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +""" +RAG ingestion pipeline for Grace's homelab knowledge base. + +Fetches documentation pages, chunks them, embeds via nomic-embed-text +(memory-engine Ollama), and upserts to Qdrant on DB VM. + +Usage: + python3 ~/scripts/rag-ingest.py # process all pending/stale docs + python3 ~/scripts/rag-ingest.py --hours 2 # stop after N hours (default 2) + python3 ~/scripts/rag-ingest.py --url URL # ingest a single URL immediately + python3 ~/scripts/rag-ingest.py --force # re-ingest all regardless of freshness + +GRACE DIRECTIVE: When Maxwell sends a URL and says 'read this', 'remember this', +'learn this', 'add this to your docs', or 'add this to the RAG': + 1. Add it to ~/scripts/rag-docs.json under the appropriate category + 2. Run: python3 ~/scripts/rag-ingest.py --url + 3. Confirm it was ingested +""" + +import argparse +import hashlib +import json +import os +import re +import sys +import time +import urllib.request +import urllib.error +from datetime import datetime, timezone, timedelta +from typing import Optional + +# ── Config ─────────────────────────────────────────────────────────────────── +DOCS_FILE = os.path.expanduser("~/scripts/rag-docs.json") +STATE_FILE = os.path.expanduser("~/self-improving/rag-state.json") +QDRANT_URL = "http://localhost:6333" +OLLAMA_URL = "http://192.168.20.142:11434" # memory-engine +EMBED_MODEL = "nomic-embed-text" +COLLECTION = "homelab_docs" +EMBED_DIM = 768 # nomic-embed-text output size +CHUNK_SIZE = 600 # tokens approx (chars/4) +CHUNK_OVERLAP = 60 # ~10% overlap +MAX_CHUNK_CHARS = CHUNK_SIZE * 4 +OVERLAP_CHARS = CHUNK_OVERLAP * 4 +DEFAULT_HOURS = 2 + + +# ── HTTP helpers ───────────────────────────────────────────────────────────── + +def http_get(url: str, timeout: int = 15) -> Optional[str]: + try: + req = urllib.request.Request(url, headers={"User-Agent": "Grace-RAG/1.0"}) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.read().decode("utf-8", errors="replace") + except Exception as e: + print(f" Fetch error {url}: {e}") + return None + + +def http_post_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]: + try: + body = json.dumps(data).encode() + req = urllib.request.Request( + url, data=body, + headers={"Content-Type": "application/json", "User-Agent": "Grace-RAG/1.0"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + except Exception as e: + print(f" POST error {url}: {e}") + return None + + +def http_put_json(url: str, data: dict, timeout: int = 30) -> Optional[dict]: + try: + body = json.dumps(data).encode() + req = urllib.request.Request( + url, data=body, + headers={"Content-Type": "application/json"}, + method="PUT" + ) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + except Exception as e: + print(f" PUT error {url}: {e}") + return None + + +# ── HTML → text ────────────────────────────────────────────────────────────── + +def html_to_text(html: str) -> str: + """Strip HTML tags and clean whitespace.""" + # Remove scripts and styles + html = re.sub(r'<(script|style)[^>]*>.*?', '', html, flags=re.DOTALL | re.IGNORECASE) + # Remove HTML comments + html = re.sub(r'', '', html, flags=re.DOTALL) + # Preserve heading structure + html = re.sub(r']*>(.*?)', lambda m: f"\n{'#'*int(m.group(1))} {m.group(2)}\n", html, flags=re.DOTALL) + # Preserve code blocks + html = re.sub(r'<(pre|code)[^>]*>(.*?)', lambda m: f"\n```\n{m.group(2)}\n```\n", html, flags=re.DOTALL) + # Convert links to text + html = re.sub(r']*>(.*?)', r'\1', html, flags=re.DOTALL) + # Strip remaining tags + html = re.sub(r'<[^>]+>', ' ', html) + # Decode entities + html = html.replace('&', '&').replace('<', '<').replace('>', '>') \ + .replace(' ', ' ').replace(''', "'").replace('"', '"') + # Clean whitespace + html = re.sub(r'\n{3,}', '\n\n', html) + html = re.sub(r'[ \t]{2,}', ' ', html) + return html.strip() + + +# ── Chunking ───────────────────────────────────────────────────────────────── + +def chunk_text(text: str, source_name: str, url: str) -> list[dict]: + """ + Split text into overlapping chunks, preserving section headings as context. + Returns list of chunk dicts with metadata. + """ + chunks = [] + # Split on heading boundaries first + sections = re.split(r'(?=\n#{1,4} )', text) + + current_heading = "" + for section in sections: + heading_match = re.match(r'\n(#{1,4} .+)\n', section) + if heading_match: + current_heading = heading_match.group(1).strip('# ').strip() + + # Further split long sections into overlapping chunks + start = 0 + while start < len(section): + end = start + MAX_CHUNK_CHARS + chunk_text_val = section[start:end].strip() + + if len(chunk_text_val) < 50: + start = end - OVERLAP_CHARS + continue + + chunk_id = hashlib.md5(f"{url}:{start}".encode()).hexdigest() + + chunks.append({ + "id": chunk_id, + "text": chunk_text_val, + "source": source_name, + "url": url, + "section": current_heading, + "char_offset": start, + }) + start = end - OVERLAP_CHARS + + return chunks + + +# ── Embedding ──────────────────────────────────────────────────────────────── + +def embed(texts: list[str]) -> Optional[list[list[float]]]: + """Embed a batch of texts via nomic-embed-text on memory-engine.""" + embeddings = [] + for text in texts: + result = http_post_json( + f"{OLLAMA_URL}/api/embeddings", + {"model": EMBED_MODEL, "prompt": text} + ) + if result and "embedding" in result: + embeddings.append(result["embedding"]) + else: + print(f" Embedding failed for text: {text[:50]}") + embeddings.append(None) + return embeddings + + +# ── Qdrant ─────────────────────────────────────────────────────────────────── + +def ensure_collection(): + """Create homelab_docs collection if it doesn't exist.""" + existing = http_get(f"{QDRANT_URL}/collections/{COLLECTION}") + if existing: + try: + data = json.loads(existing) + if data.get("status") == "ok": + return # already exists + except Exception: + pass + + print(f"Creating Qdrant collection '{COLLECTION}'...") + http_put_json(f"{QDRANT_URL}/collections/{COLLECTION}", { + "vectors": { + "size": EMBED_DIM, + "distance": "Cosine" + } + }) + + +def upsert_chunks(chunks: list[dict], embeddings: list[list[float]]): + """Upsert chunks + embeddings to Qdrant.""" + points = [] + for chunk, embedding in zip(chunks, embeddings): + if embedding is None: + continue + # Convert hex id to int for Qdrant + point_id = int(chunk["id"][:8], 16) + points.append({ + "id": point_id, + "vector": embedding, + "payload": { + "text": chunk["text"], + "source": chunk["source"], + "url": chunk["url"], + "section": chunk["section"], + } + }) + + if not points: + return 0 + + result = http_put_json( + f"{QDRANT_URL}/collections/{COLLECTION}/points?wait=true", + {"points": points} + ) + return len(points) if result and result.get("status") == "ok" else 0 + + +# ── State ──────────────────────────────────────────────────────────────────── + +def load_state() -> dict: + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"ingested": {}, "total_chunks": 0} + + +def save_state(state: dict): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def is_stale(state: dict, url: str, refresh_days: int) -> bool: + entry = state["ingested"].get(url) + if not entry: + return True + last = datetime.fromisoformat(entry["ingested_at"]) + return datetime.now(timezone.utc) - last > timedelta(days=refresh_days) + + +# ── Main ───────────────────────────────────────────────────────────────────── + +def ingest_url(url: str, source_name: str, state: dict) -> int: + """Fetch, chunk, embed, upsert one URL. Returns chunk count.""" + print(f" Fetching: {url}") + html = http_get(url) + if not html: + return 0 + + text = html_to_text(html) + if len(text) < 200: + print(f" Too short after cleaning ({len(text)} chars), skipping") + return 0 + + chunks = chunk_text(text, source_name, url) + print(f" {len(chunks)} chunks") + + # Embed in batches of 10 + all_embeddings = [] + for i in range(0, len(chunks), 10): + batch = chunks[i:i+10] + embeddings = embed([c["text"] for c in batch]) + all_embeddings.extend(embeddings) + time.sleep(0.5) # be gentle with memory-engine + + count = upsert_chunks(chunks, all_embeddings) + + state["ingested"][url] = { + "source": source_name, + "ingested_at": datetime.now(timezone.utc).isoformat(), + "chunks": count, + } + state["total_chunks"] = state.get("total_chunks", 0) + count + return count + + +def main(): + parser = argparse.ArgumentParser(description="Grace RAG ingestion pipeline") + parser.add_argument("--hours", type=float, default=DEFAULT_HOURS, help="Max hours to run") + parser.add_argument("--url", type=str, help="Ingest a single URL immediately") + parser.add_argument("--force", action="store_true", help="Re-ingest all regardless of freshness") + parser.add_argument("--status", action="store_true", help="Show ingestion status and exit") + args = parser.parse_args() + + state = load_state() + + if args.status: + print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}") + print(f"URLs ingested: {len(state['ingested'])}") + for url, info in sorted(state["ingested"].items(), key=lambda x: x[1]["ingested_at"]): + print(f" {info['ingested_at'][:10]} {info['chunks']:4d} chunks {url}") + return + + ensure_collection() + + # Single URL mode + if args.url: + print(f"Ingesting: {args.url}") + count = ingest_url(args.url, args.url, state) + save_state(state) + print(f"Done: {count} chunks ingested") + return + + # Load doc list + with open(DOCS_FILE) as f: + docs = json.load(f) + + deadline = datetime.now(timezone.utc) + timedelta(hours=args.hours) + total_new_chunks = 0 + total_urls = 0 + + for source in docs["sources"]: + name = source["name"] + refresh_days = source.get("refresh_days", 30) + + for url in source["urls"]: + if datetime.now(timezone.utc) >= deadline: + print(f"\nTime limit ({args.hours}h) reached — stopping. Run again to continue.") + break + + if not args.force and not is_stale(state, url, refresh_days): + continue + + print(f"\n[{name}]") + count = ingest_url(url, name, state) + total_new_chunks += count + total_urls += 1 + save_state(state) # save after each URL so progress isn't lost + time.sleep(1) # polite crawling + + print(f"\n=== Done: {total_urls} URLs, {total_new_chunks} new chunks ===") + print(f"Total chunks in Qdrant: {state.get('total_chunks', 0)}") + + +if __name__ == "__main__": + main() diff --git a/rag-query.py b/rag-query.py new file mode 100755 index 0000000..fa7ae24 --- /dev/null +++ b/rag-query.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +""" +Query Grace's RAG knowledge base. + +Usage (from exec): + python3 ~/scripts/rag-query.py "how do I configure reverse_proxy in Caddy" + python3 ~/scripts/rag-query.py "OPNsense unbound API add host" --top 5 + python3 ~/scripts/rag-query.py "proxmox create LXC" --source "Proxmox VE API" + +Returns relevant doc chunks with source + URL — use instead of web_fetch for +known homelab documentation. +""" + +import argparse +import json +import sys +import urllib.request +from typing import Optional + +QDRANT_URL = "http://localhost:6333" +OLLAMA_URL = "http://192.168.20.142:11434" +EMBED_MODEL = "nomic-embed-text" +COLLECTION = "homelab_docs" + + +def embed_query(text: str) -> Optional[list[float]]: + body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode() + req = urllib.request.Request( + f"{OLLAMA_URL}/api/embeddings", data=body, + headers={"Content-Type": "application/json"} + ) + try: + with urllib.request.urlopen(req, timeout=15) as r: + return json.loads(r.read())["embedding"] + except Exception as e: + print(f"Embed error: {e}", file=sys.stderr) + return None + + +def search(query_vector: list[float], top: int = 5, source_filter: Optional[str] = None) -> list[dict]: + payload: dict = { + "vector": query_vector, + "limit": top, + "with_payload": True, + } + if source_filter: + payload["filter"] = { + "must": [{"key": "source", "match": {"value": source_filter}}] + } + + body = json.dumps(payload).encode() + req = urllib.request.Request( + f"{QDRANT_URL}/collections/{COLLECTION}/points/search", + data=body, + headers={"Content-Type": "application/json"} + ) + try: + with urllib.request.urlopen(req, timeout=15) as r: + return json.loads(r.read()).get("result", []) + except Exception as e: + print(f"Search error: {e}", file=sys.stderr) + return [] + + +def main(): + parser = argparse.ArgumentParser(description="Query Grace's RAG knowledge base") + parser.add_argument("query", help="Natural language question") + parser.add_argument("--top", type=int, default=5, help="Number of results (default 5)") + parser.add_argument("--source", type=str, default=None, help="Filter by source name") + args = parser.parse_args() + + vector = embed_query(args.query) + if not vector: + print("Failed to embed query") + sys.exit(1) + + results = search(vector, top=args.top, source_filter=args.source) + + if not results: + print("No results found. Has the RAG been ingested yet?") + print("Run: python3 ~/scripts/rag-ingest.py") + sys.exit(0) + + for i, r in enumerate(results, 1): + p = r.get("payload", {}) + score = r.get("score", 0) + print(f"\n{'='*60}") + print(f"[{i}] {p.get('source','')} — {p.get('section','')}") + print(f" Score: {score:.3f} | {p.get('url','')}") + print(f"{'-'*60}") + print(p.get("text", "")[:800]) + + +if __name__ == "__main__": + main()