#!/usr/bin/env python3 """ Load Grace transcript JSONL files into PostgreSQL on DB VM (192.168.20.87). Tables: sessions — one row per conversation session turns — one row per turn (system/human/gpt) session_tags — optional labels Run nightly after export-transcripts.py. """ import json import os import glob import psycopg2 from datetime import datetime, timezone TRANSCRIPTS_DIR = "/mnt/ai-storage/grace/training-data/transcripts" STATE_FILE = os.path.expanduser("~/self-improving/db-load-state.json") DB_CONFIG = { "host": "192.168.20.87", "port": 5432, "dbname": "grace_training", "user": "grace_ai", "password": "grace_training_2026", } def load_state(): if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: return json.load(f) return {"loaded_files": [], "total_sessions": 0, "total_turns": 0} def save_state(state): with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def load_file(conn, path: str) -> tuple[int, int]: """Load one transcript JSONL file. Returns (sessions_added, turns_added).""" sessions_added = 0 turns_added = 0 with open(path) as f: for line in f: line = line.strip() if not line: continue try: example = json.loads(line) except json.JSONDecodeError: continue source_file = example.get("source", os.path.basename(path)) session_id = source_file.replace(".jsonl", "") turn_count = example.get("turn_count", 0) exported_at = example.get("exported_at", datetime.now(timezone.utc).isoformat()) conversations = example.get("conversations", []) with conn.cursor() as cur: # Upsert session (skip if already loaded) cur.execute(""" INSERT INTO sessions (session_id, source_file, turn_count, exported_at) VALUES (%s, %s, %s, %s) ON CONFLICT (session_id) DO NOTHING RETURNING id """, (session_id, source_file, turn_count, exported_at)) row = cur.fetchone() if not row: # Already exists, skip continue sessions_added += 1 # Insert all turns for i, turn in enumerate(conversations): role = turn.get("from", "") content = turn.get("value", "") if not content: continue cur.execute(""" INSERT INTO turns (session_id, turn_index, role, content) VALUES (%s, %s, %s, %s) """, (session_id, i, role, content)) turns_added += 1 conn.commit() return sessions_added, turns_added def main(): state = load_state() transcript_files = sorted(glob.glob(os.path.join(TRANSCRIPTS_DIR, "*.jsonl"))) new_files = [f for f in transcript_files if os.path.basename(f) not in state["loaded_files"]] if not new_files: print("No new transcript files to load.") return try: conn = psycopg2.connect(**DB_CONFIG) except Exception as e: print(f"DB connection failed: {e}") return total_sessions = 0 total_turns = 0 for f in new_files: print(f"Loading: {os.path.basename(f)}") try: s, t = load_file(conn, f) print(f" → {s} sessions, {t} turns") total_sessions += s total_turns += t state["loaded_files"].append(os.path.basename(f)) except Exception as e: print(f" Error: {e}") conn.rollback() conn.close() state["total_sessions"] = state.get("total_sessions", 0) + total_sessions state["total_turns"] = state.get("total_turns", 0) + total_turns save_state(state) print(f"\nLoaded {total_sessions} sessions, {total_turns} turns") print(f"DB totals: {state['total_sessions']} sessions, {state['total_turns']} turns") if __name__ == "__main__": main()